Categories
JavaScript Rxjs

More Useful Rxjs Transformation Operators

Spread the love

RxJS is a library for reactive programming. Creation operators are useful for generating data from various data sources to be subscribed to by Observers.

In this article, we’ll look at some RxJS transformation operators like concatMapTo, expand, exhaust, and exhaustMap operators.

concatMapTo

The concatMapTo operator takes the source value of the Observable and maps each value to the same inner Observable.

It takes up to 2 arguments. The first is the innerObservable, which is the Observable to map each value from the originating Observable to.

The second argument is a resultSelector function to let us pick the results.

Each originating Observable’s emitted value is mapped to the given innerObservable regardless of the source value. Then flatten those resulting Observables into one Observable. which is the returned Observable.

Each new innerObservable’s emitted are concatenated together with the other ones’ values.

If the source values arrive endlessly and faster than the corresponding inner Observables can complete then it’ll result in memory issues since there’ll be a large amount of data amassed waiting to be subscribed to.

For example, we can use it as follows:

import { fromEvent, interval } from "rxjs";  
import { take, concatMapTo } from "rxjs/operators";
const clicks = fromEvent(document, "click");  
const result = clicks.pipe(concatMapTo(interval(1000).pipe(take(5))));  
result.subscribe(x => console.log(x));

The code above will get the click events from the document and then emit 0 after 1 second, then wait for another second and emit, up to 4.

exhaust

The exhaust operator converts a higher-level Observable, which are Observables that emits Observables, into the first-order Observable by emitted inner Observables while previous inner Observable hasn’t been completed.

The result would be that the values from all the Observables are flattened and emitted by one Observable, which is the one returned by this operator.

For example, we can write:

import { interval, of } from "rxjs";  
import { exhaust, map, take } from "rxjs/operators";
const observable = of(1, 2, 3);  
const higherOrder = observable.pipe(map(ev => interval(1000).pipe(take(5))));  
const result = higherOrder.pipe(exhaust());  
result.subscribe(x => console.log(x));

In the code above, we have an observable Observable that emits 1, 2, and 3. The values emitted by observable are mapped to inner Observables using the map operator into interval(1000).pipe(take(5) .

Since the inner Observables that aren’t completed are dropped by the exhaust operator, we’ll get that interval(1000).pipe(take(5) is only emitted once since only the first instance of it is started. Then we get that a number is emitted every second, starting with 0 and stops at 4.

exhaustMap

The exhaustMap operator also takes the first inner Observable that are mapped from the originating Observable like the exhaust operator. The difference is that it takes a function that lets us map the values of the origination Observable’s emitted values into another Observable, but only taking the first emitted Observable by this operator.

For example, if we have the following code:

import { interval, of } from "rxjs";  
import { take, exhaustMap } from "rxjs/operators";
const observable = of(1, 2, 3);  
const result = observable.pipe(exhaustMap(ev => interval(1000).pipe(take(5))));  
result.subscribe(x => console.log(x));

We get the same result as exhaust, but we don’t have to pipe and map the emitted values of observable into a new Observable. Instead, we combined everything together with the exhaustMap operator.

expand

The expand operator recursively projects each source value to an Observable which is then merged into the output Observable.

It takes 3 arguments, which is the project function, which applies to the items emitted by the source Observable or the output Observable and returns a new Observable.

The second argument is the concurrency, which is an optional argument for the maximum number of input Observables being subscribed to concurrently. It defaults to Number.POSITIVE_INFINITY .

The last argument is the optional scheduler object, which is the scheduler we can use to time the emission of values.

One simple example usage of this operator would be as follows:

import { of } from "rxjs";  
import { expand, delay, take } from "rxjs/operators";
const powersOfTwo = of(1, 2, 3).pipe(  
  expand(x => of(x).pipe(delay(1000))),  
  take(10)  
);  
powersOfTwo.subscribe(x => console.log(x));

In the code above, we have the of(1, 2, 3) Observable, which have the emitted values pipe d to the expand(x => of(x).pipe(delay(1000))) Observable. Then the first 10 values are taken from the Observable returned by expand, which takes the of(1, 2, 3) Observable’s emitted values and emit them repeatedly until we have 10 values emitted and each group is emitted after waiting 1 second.

This is because we specified of(x) which takes the of(1, 2, 3) values and then get then emit them without making any changes. pipe(delay(1000)) delays the emission of each group of values 1, 2 and 3 one second apart.

This should result in the following output:

1  
2  
3  
1  
2  
3  
1  
2  
3  
1

Conclusion

concatMapTo operator takes the source value of the Observable and maps each value to the same inner Observable.

The exhaust operator converts an Observable which emits Observables and then emit the values from the first inner Observable and discard the ones that haven’t finished emitting.

The exhaustMap operator also takes the first inner Observable then map them into inner Observables, which then have the values emitted.

Finally, the expand operator recursively projects each source value to an Observable which is then merged into the output Observable and emitted.

By John Au-Yeung

Web developer specializing in React, Vue, and front end development.

Leave a Reply

Your email address will not be published. Required fields are marked *