Rxjs is a reactive programming library that lets us use the observer pattern. This pattern lets us watch for changes in our programs and run code accordingly. Observables are the entities that emit values when it picks up changes. Observers are the entities that get the data sent by observables.
In this article, we’ll take a quick look at how to create new observables to send new values to observers.
Components of Rxjs
There’re a few parts to Rxjs. They are:
- Observables — entities that get and send changes to observers
- Observers — entities that watch for new data pushed from observables
- Operators — functions that deal with collections with operations by using operations like like
map
,filter
,concat
,reduce
, etc. - Subjects— event emitters that broadcast data to multiple observers
- Schedulers — centralized dispatchers to control concurrency. They ley us coordinate when computation happens.
Creating Observables
We need observables to send data to observers. With Rxjs, we have an Observable
constructor that lets us emit data with subscribers.
For example, we can write:
const observable = new Rx.Observable(subscriber => {
subscriber.next(1);
subscriber.next(2);
setTimeout(() => {
subscriber.next(3);
subscriber.complete();
}, 1000);
});
The observable
above will emit 1, 2 immediately with the subscriber, and 3 after 1 second. The subscriber
is the subscriber that we use to emit the data to the observers.
complete
stops the observable from using the subscriber to emit more data.
We can use the observable
to get the emitted values as follows:
observable.subscribe(val => console.log(val));
Also, we can pass in an object to the subscribe
method with a next
method for getting the emitted values, error
for getting the errors, and the complete
method to run something when the observable has done sending data:
observable.subscribe({
next(x) {
console.log(x);
},
error(err) {
console.error(err);
},
complete() {
console.log("done");
}
});
Pull versus Push
Observables are push systems where data is pushed from a source that’s emitted from to the observer.
It’s a producer of multiple values. Evaluation is done only when observers get new values.
Observables are Like Functions
Observables are like functions in that they both return data for other entities. We can use the returned data in any place we wish.
For example, if we have the following function:
const foo = () => 1
and an observable:
const foo = new Observable(subscriber => {
subscriber.next(1);
});
foo.subscribe(x => {
console.log(x);
});
Then they both give us the value 1. Except that as we saw before, we can also get more than one value with observables which we can’t do with functions.
Also, they can either be synchronous or asynchronous like functions. From the first example, we have:
subscriber.next(1);
subscriber.next(2);
being run line by line, while:
setTimeout(() => {
subscriber.next(3);
subscriber.complete();
}, 1000);
waits 1 second before it runs, which means it’s asynchronous.
Parts of an Observable
Observables are created with the Obseravable
constructor, which we can subscribe to with an observer.
It delivers next
, error
or complete
notifications to observers. Observables are disposed of automatically once they received everything.
Creating an Observable
The Obserable
constructor takes a callback function which has the subscriber
parameter, which gets us the subscriber object to emit values.
For example, we can write:
const observable = new Observable((subscriber) => {
const id = setInterval(() => {
subscriber.next("foo");
}, 1000);
});
to emit 'foo'
every second to observers.
Subscribing to Observables
We call the subscribe
method of the Observable object to subscribe to the values it pushes. For instance, we can write:
observable.subscribe(x => console.log(x));
to get the latest values.
The subscribe
calls aren’t shared by multiple Observers of the same Observable. Each call to subscribe creates its own observer to observe the values.
Executing Observables
The code:
const observable = new Observable((subscriber) => {
//...
});
executes the observable. It happens only for each Observable that subscribes.
It can deliver the following values:
- next — sends values to observers
- error — sends errors or exceptions to observers
- complete — sends nothing
Observables adhere strictly to the Observable contract, so once complete
is called, it’s not going to send more data.
For example:
const observable = new Observable(subscriber => {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
subscriber.complete();
subscriber.next(4);
});
4 won’t be sent to observables since complete
has already been called.
Disposing Observable Executions
We can call the unsubscribe
to stop subscribing to the observable. It’ll stop watching the Observable for more changes after it’s called and dispose of the resources needed to do the watching.
We can return an unsubscribe
function to put clean up code that we want to add. For example, we can write:
const observable = new Observable(function subscribe(subscriber) {
const intervalId = setInterval(() => {
subscriber.next("hi");
}, 1000);
return function unsubscribe() {
clearInterval(intervalId);
};
});
const subscription = observable.subscribe(x => console.log(x));
setTimeout(() => {
subscription.unsubscribe();
}, 3000);
The code above has an unsubscribe
function that calls clearInterval
with the ID returned by setInterval
. Then we called subscribe
to subscribe to our observable, which returns an object with the unsubscribe
method. This method is called after 3 seconds in the callback for setTimeout
.
With Rxjs, we can create Observables to emit values with the subscriber
object. Then we can subscribe to observables to watch for values.
We can also create a function that cleans up when unsubscribing and call unsubscribe
afterwards when we don’t want to subscribe to an Observable anymore.