Introduction to Reactive Programming with Rxjs

Spread the love

Rxjs is a reactive programming library that lets us use the observer pattern. This pattern lets us watch for changes in our programs and run code accordingly. Observables are the entities that emit values when it picks up changes. Observers are the entities that get the data sent by observables.

In this article, we’ll take a quick look at how to create new observables to send new values to observers.

Components of Rxjs

There’re a few parts to Rxjs. They are:

  • Observables — entities that get and send changes to observers
  • Observers — entities that watch for new data pushed from observables
  • Operators — functions that deal with collections with operations by using operations like like map, filter, concat, reduce, etc.
  • Subjects— event emitters that broadcast data to multiple observers
  • Schedulers — centralized dispatchers to control concurrency. They ley us coordinate when computation happens.

Creating Observables

We need observables to send data to observers. With Rxjs, we have an Observable constructor that lets us emit data with subscribers.

For example, we can write:

const observable = new Rx.Observable(subscriber => {;;
  setTimeout(() => {;
  }, 1000);

The observable above will emit 1, 2 immediately with the subscriber, and 3 after 1 second. The subscriber is the subscriber that we use to emit the data to the observers.

complete stops the observable from using the subscriber to emit more data.

We can use the observable to get the emitted values as follows:

observable.subscribe(val => console.log(val));

Also, we can pass in an object to the subscribe method with a next method for getting the emitted values, error for getting the errors, and the complete method to run something when the observable has done sending data:

  next(x) {
  error(err) {
  complete() {

Pull versus Push

Observables are push systems where data is pushed from a source that’s emitted from to the observer.

It’s a producer of multiple values. Evaluation is done only when observers get new values.

Observables are Like Functions

Observables are like functions in that they both return data for other entities. We can use the returned data in any place we wish.

For example, if we have the following function:

const foo = () => 1

and an observable:

const foo = new Observable(subscriber => {;

foo.subscribe(x => {

Then they both give us the value 1. Except that as we saw before, we can also get more than one value with observables which we can’t do with functions.

Also, they can either be synchronous or asynchronous like functions. From the first example, we have:;;

being run line by line, while:

setTimeout(() => {;
}, 1000);

waits 1 second before it runs, which means it’s asynchronous.

Parts of an Observable

Observables are created with the Obseravable constructor, which we can subscribe to with an observer.

It delivers next , error or complete notifications to observers. Observables are disposed of automatically once they received everything.

Creating an Observable

The Obserable constructor takes a callback function which has the subscriber parameter, which gets us the subscriber object to emit values.

For example, we can write:

const observable = new Observable((subscriber) => {
  const id = setInterval(() => {"foo");
  }, 1000);

to emit 'foo' every second to observers.

Subscribing to Observables

We call the subscribe method of the Observable object to subscribe to the values it pushes. For instance, we can write:

observable.subscribe(x => console.log(x));

to get the latest values.

The subscribe calls aren’t shared by multiple Observers of the same Observable. Each call to subscribe creates its own observer to observe the values.

Executing Observables

The code:

const observable = new Observable((subscriber) => {

executes the observable. It happens only for each Observable that subscribes.

It can deliver the following values:

  • next — sends values to observers
  • error — sends errors or exceptions to observers
  • complete — sends nothing

Observables adhere strictly to the Observable contract, so once complete is called, it’s not going to send more data.

For example:

const observable = new Observable(subscriber => {;;;

4 won’t be sent to observables since complete has already been called.

Disposing Observable Executions

We can call the unsubscribe to stop subscribing to the observable. It’ll stop watching the Observable for more changes after it’s called and dispose of the resources needed to do the watching.

We can return an unsubscribe function to put clean up code that we want to add. For example, we can write:

const observable = new Observable(function subscribe(subscriber) {
  const intervalId = setInterval(() => {"hi");
  }, 1000);

  return function unsubscribe() {

const subscription = observable.subscribe(x => console.log(x));

setTimeout(() => {
}, 3000);

The code above has an unsubscribe function that calls clearInterval with the ID returned by setInterval . Then we called subscribe to subscribe to our observable, which returns an object with the unsubscribe method. This method is called after 3 seconds in the callback for setTimeout .

With Rxjs, we can create Observables to emit values with the subscriber object. Then we can subscribe to observables to watch for values.

We can also create a function that cleans up when unsubscribing and call unsubscribe afterwards when we don’t want to subscribe to an Observable anymore.

By John Au-Yeung

Web developer specializing in React, Vue, and front end development.

Leave a Reply

Your email address will not be published. Required fields are marked *