Rxjs is a library for doing reactive programming. Creation operators are useful for generating data from various data sources to be subscribed to by Observers.
In this article, we’ll look at some join operators, including exhaust
, mergeAll
, startsWith
and withLatestFrom
.
exhaust
The exhaust
operator converts higher-order Observables into first-order Observables by dropping inner Observables while previous inner Observable hasn’t completed.
It takes no parameters and returns an Observable that takes the source Observables and propagates values from the first Observable exclusively until it completes before subscribing to the next one.
For example, we can use it as follows:
import { of, interval } from "rxjs";
import { map, exhaust, take } from "rxjs/operators";
const of$ = of(1, 2, 3);
const higherOrderObservable = of$.pipe(map(val => interval(500).pipe(take(3))));
const result = higherOrderObservable.pipe(exhaust());
result.subscribe(x => console.log(x));
The code above maps the values from the of$
Observable into interval(500).pipe(take(3))
Observables, which emit numbers up to 2 every half a second.
Then we pipe
the interval(500).pipe(take(3))
Observables to the exhaust
operator. Then all Observables other than the first interval(500).pipe(take(3))
are dropped since it has finished emitting while the next ones are to be executed.
Then we get:
0
1
2
outputted from console.log
.
mergeAll
mergeAll
converts higher-order Observables to first-order Observables which concurrently delivers all values emitted by inner Observables.
It takes the optionalconcurrent
argument, which defaults to Number.INFINITY
for the maximum number of inner Observables being subscribed to concurrently.
mergeAll
subscribes to all inner Observables within higher-order Observables and delivers all the values from them on the output Observable. The returned output Observable only completes when all inner Observables are completed.
Any error emitted by inner Observables will immediately result in errors emitted by the returned Observable.
For instance, we can use it as follows:
import { of } from "rxjs";
import { map, mergeAll } from "rxjs/operators";
const of$ = of(1, 2, 3);
const higherOrderObservable = of$.pipe(map(val => of("a", "b", "c")));
const result = higherOrderObservable.pipe(mergeAll());
result.subscribe(x => console.log(x));
The code above maps each emitted values from the of$
Observable to of(“a”, “b”, “c”)
Observables.
Then the mergeAll
operator subscribes to all the of(“a”, “b”, “c”)
Observables and then subscribe to each of them and then emit the values of each.
Then we get:
a
b
c
a
b
c
a
b
c
from console.log
.
We can also change the concurrency
by passing in a number to the mergeAll
operator.
For example, we can write:
import { of, interval } from "rxjs";
import { map, mergeAll, take } from "rxjs/operators";
const of$ = of(1, 2, 3);
const higherOrderObservable = of$.pipe(
map(val => interval(1000).pipe(take(2)))
);
const result = higherOrderObservable.pipe(mergeAll(1));
result.subscribe(x => console.log(x));
to make mergeAll
subscribe to each child Observable returned from the map
operator’s callback sequentially, which will get us:
0
1
0
1
0
1
from the console.log
.
Or we can change 1 to 5 as follows:
import { of, interval } from "rxjs";
import { map, mergeAll, take } from "rxjs/operators";
const of$ = of(1, 2, 3);
const higherOrderObservable = of$.pipe(
map(val => interval(1000).pipe(take(2)))
);
const result = higherOrderObservable.pipe(mergeAll(5));
result.subscribe(x => console.log(x));
Then we get:
(3) 0
(3) 1
outputted from console.log
as they’re subscribed to concurrently.
startWith
startsWith
returns an Observable that emits the items we want to emit before it begins to emit items from the source Observable.
It takes one argument, which is an array
of items that we want to be emitted before the values from the source Observable by the returned Observable.
For example, we can use it as follows:
import { of } from "rxjs";
import { startWith } from "rxjs/operators";
of(1)
.pipe(startWith("foo", "bar"))
.subscribe(x => console.log(x));
Then we get:
foo
bar
1
as the console.log
output.
withLatestFrom
The withLatestFrom
operator combines the source Observable with other Observables to create an Observable which emits values that are calculated from the latest values of each, only when the source emits.
It takes a list of arguments, which is an Observable to combine the values with.
For example, we can use it as follows:
import { fromEvent, interval } from "rxjs";
import { withLatestFrom } from "rxjs/operators";
const clicks = fromEvent(window, "click");
const timer = interval(1000);
const result = clicks.pipe(withLatestFrom(timer));
result.subscribe(x => console.log(x));
We track the clicks of the browser tab with the fromEvent
operator. Then we combine the results emitted from the timer
Observable with it by using the withLatestFrom
operator.
In the end, the result
Observable will emit arrays that have the MouseEvent
object from the clicks
Observable as the first value and the number from the timer
Observable as the second value.
The exhaust
operator converts higher-order Observables into first-order Observables by dropping inner Observables while previous inner Observable hasn’t completed.
mergeAll
converts higher-order Observables to first-order Observables which concurrently delivers all values emitted by inner Observables.
startsWith
returns an Observable that emits the items we want to emit before the source Observable begins emitting values.
The withLatestFrom
operator combines the source Observable with another Observable and returns an Observable that emits the latest values from both.