Categories
JavaScript Rxjs

Rxjs Operators — More Join Operators

Spread the love

Rxjs is a library for doing reactive programming. Creation operators are useful for generating data from various data sources to be subscribed to by Observers.

In this article, we’ll look at some join operators, including exhaust , mergeAll , startsWith and withLatestFrom .

exhaust

The exhaust operator converts higher-order Observables into first-order Observables by dropping inner Observables while previous inner Observable hasn’t completed.

It takes no parameters and returns an Observable that takes the source Observables and propagates values from the first Observable exclusively until it completes before subscribing to the next one.

For example, we can use it as follows:

import { of, interval } from "rxjs";  
import { map, exhaust, take } from "rxjs/operators";

const of$ = of(1, 2, 3);  
const higherOrderObservable = of$.pipe(map(val => interval(500).pipe(take(3))));  
const result = higherOrderObservable.pipe(exhaust());  
result.subscribe(x => console.log(x));

The code above maps the values from the of$ Observable into interval(500).pipe(take(3)) Observables, which emit numbers up to 2 every half a second.

Then we pipe the interval(500).pipe(take(3)) Observables to the exhaust operator. Then all Observables other than the first interval(500).pipe(take(3)) are dropped since it has finished emitting while the next ones are to be executed.

Then we get:

0  
1  
2

outputted from console.log .

mergeAll

mergeAll converts higher-order Observables to first-order Observables which concurrently delivers all values emitted by inner Observables.

It takes the optionalconcurrent argument, which defaults to Number.INFINITY for the maximum number of inner Observables being subscribed to concurrently.

mergeAll subscribes to all inner Observables within higher-order Observables and delivers all the values from them on the output Observable. The returned output Observable only completes when all inner Observables are completed.

Any error emitted by inner Observables will immediately result in errors emitted by the returned Observable.

For instance, we can use it as follows:

import { of } from "rxjs";  
import { map, mergeAll } from "rxjs/operators";

const of$ = of(1, 2, 3);  
const higherOrderObservable = of$.pipe(map(val => of("a", "b", "c")));  
const result = higherOrderObservable.pipe(mergeAll());  
result.subscribe(x => console.log(x));

The code above maps each emitted values from the of$ Observable to of(“a”, “b”, “c”) Observables.

Then the mergeAll operator subscribes to all the of(“a”, “b”, “c”) Observables and then subscribe to each of them and then emit the values of each.

Then we get:

a  
b  
c  
a  
b  
c  
a  
b  
c

from console.log .

We can also change the concurrency by passing in a number to the mergeAll operator.

For example, we can write:

import { of, interval } from "rxjs";  
import { map, mergeAll, take } from "rxjs/operators";

const of$ = of(1, 2, 3);  
const higherOrderObservable = of$.pipe(  
  map(val => interval(1000).pipe(take(2)))  
);  
const result = higherOrderObservable.pipe(mergeAll(1));  
result.subscribe(x => console.log(x));

to make mergeAll subscribe to each child Observable returned from the map operator’s callback sequentially, which will get us:

0  
1  
0  
1  
0  
1

from the console.log .

Or we can change 1 to 5 as follows:

import { of, interval } from "rxjs";  
import { map, mergeAll, take } from "rxjs/operators";

const of$ = of(1, 2, 3);  
const higherOrderObservable = of$.pipe(  
  map(val => interval(1000).pipe(take(2)))  
);  
const result = higherOrderObservable.pipe(mergeAll(5));  
result.subscribe(x => console.log(x));

Then we get:

(3) 0  
(3) 1

outputted from console.log as they’re subscribed to concurrently.

startWith

startsWith returns an Observable that emits the items we want to emit before it begins to emit items from the source Observable.

It takes one argument, which is an array of items that we want to be emitted before the values from the source Observable by the returned Observable.

For example, we can use it as follows:

import { of } from "rxjs";  
import { startWith } from "rxjs/operators";

of(1)  
  .pipe(startWith("foo", "bar"))  
  .subscribe(x => console.log(x));

Then we get:

foo  
bar  
1

as the console.log output.

withLatestFrom

The withLatestFrom operator combines the source Observable with other Observables to create an Observable which emits values that are calculated from the latest values of each, only when the source emits.

It takes a list of arguments, which is an Observable to combine the values with.

For example, we can use it as follows:

import { fromEvent, interval } from "rxjs";  
import { withLatestFrom } from "rxjs/operators";

const clicks = fromEvent(window, "click");  
const timer = interval(1000);  
const result = clicks.pipe(withLatestFrom(timer));  
result.subscribe(x => console.log(x));

We track the clicks of the browser tab with the fromEvent operator. Then we combine the results emitted from the timer Observable with it by using the withLatestFrom operator.

In the end, the result Observable will emit arrays that have the MouseEvent object from the clicks Observable as the first value and the number from the timer Observable as the second value.

The exhaust operator converts higher-order Observables into first-order Observables by dropping inner Observables while previous inner Observable hasn’t completed.

mergeAll converts higher-order Observables to first-order Observables which concurrently delivers all values emitted by inner Observables.

startsWith returns an Observable that emits the items we want to emit before the source Observable begins emitting values.

The withLatestFrom operator combines the source Observable with another Observable and returns an Observable that emits the latest values from both.

By John Au-Yeung

Web developer specializing in React, Vue, and front end development.

Leave a Reply

Your email address will not be published. Required fields are marked *