Categories
JavaScript Rxjs

Some Useful Rxjs Transformation Operators

Spread the love

RxJS is a library for reactive programming. Creating operators are useful for generating data from various data sources to be subscribed to by Observers.

In this article, we’ll look at some RxJS transformation operators like bufferTime, bufferToggle, bufferWhen, and concatMap operators.

bufferTime

The bufferTime operator buffers the emitted data of the originating Observable for a specific time period.

It takes one argument, the bufferTimeSpan, which is the amount of time to fill each buffer array. The unit for the time span is milliseconds.

The buffer data is emitted and the buffer is reset once the amount of time specified is up.

It also takes a bufferCreationInterval argument which specifies the time span for which the buffer data builds up. The operator opens the buffer every bufferCreationInterval milliseconds and emits and resets every bufferTimeSpan milliseconds.

Another optional argument for this operator is the maxBufferSize, which is a number that specifies the maximum size of items buffered.

For example, we can use it as follows:

import { interval } from "rxjs";  
import { bufferTime } from "rxjs/operators";

const clicks = interval(2000);  
const buffered = clicks.pipe(bufferTime(1000));  
buffered.subscribe(x => console.log(x));

We should see a new number emitted every 2 seconds in the arrays that are emitted, while the rest of the emitted arrays are empty.

The bufferCreationInterval argument can be used as follows:

import { interval } from "rxjs";  
import { bufferTime } from "rxjs/operators";

const clicks = interval(2000);  
const buffered = clicks.pipe(bufferTime(1000, 1000));  
buffered.subscribe(x => console.log(x));

In the code above, the buffered data is emitted every second, and the buffer is created every second.

bufferToggle

bufferToggle buffers the source Observable values starting from the emission of openings and ending when the output of the closingSelector emits.

Values emitted by the originating Observable is buffered until the closingSelector tells us to stop emitting values from the originating Observable.

For example, if we have a button as follows:

<button>Click Me</button>

We can buffer the mouse click events on a button and the emit the MouseEvent objects that were buffered according to the closingSelector function’s specification as follows:

import { fromEvent, interval, EMPTY } from "rxjs";  
import { bufferToggle } from "rxjs/operators";

const clicks = fromEvent(document.querySelector("button"), "click");  
const openings = interval(1000);  
const buffered = clicks.pipe(  
  bufferToggle(openings, i => (i % 2 ? interval(1500) : EMPTY))  
);  
buffered.subscribe(x => console.log(x));

The closingSelector in the example above is:

(i % 2 ? interval(1500) : EMPTY)

The code will emit data from the openings Observable when it starts emitting, which is when we click on our button, ever 1.5 seconds. The emitted data is buffered into an array and then the buffering ends when i % 2 is false, and the interval(1500) Observable is returned signaling closure. It’ll continue buffer when EMPTY is emitted.

bufferWhen

The bufferWhen operator buffers data from the source Observable until the closingSelector function closes the buffer.

It takes one argument, which is the closingSelector function to specify when the buffer will be closed.

For example, we can use it as we do in the following code:

import { fromEvent, interval } from "rxjs";  
import { bufferWhen } from "rxjs/operators";

const clicks = fromEvent(document.querySelector("button"), "click");  
const buffered = clicks.pipe(  
  bufferWhen(() => interval(1000 + Math.random() * 4000))  
);  
buffered.subscribe(x => console.log(x));

What we did is get the button clicks and then emit the MouseEvent object array of the MouseEvent s from the clicks that are buffered.

Once we did, then the closingSelector specifies that we emit the buffered values every 1000 + Math.random() * 4000 milliseconds and empty the buffer and buffer the click events again.

concatMap

The concatMap operator takes each source value of the originating Observable and wait for one value of the originating Observable to emit before the next emitted value from it emits.

It takes 2 arguments. The first is a project function, which is a function that returns a new Observable with operations that we want to be applied to the values emitted from the originating Observable. The second argument is the resultSelector . It’s an optional argument, which is a function to pick the emitted values that we want to emit in the returned Observable.

For example, we can use it as follows:

import { fromEvent, interval, of } from "rxjs";  
import { concatMap, take } from "rxjs/operators";

const clicks = fromEvent(document, "click");  
const result = clicks.pipe(concatMap(ev => interval(1000).pipe(take(5))));  
result.subscribe(x => console.log(x));

The code above will receive the click events, then after 1 second emit 0, then after another second emit 1, up until it reaches 4.

It’ll do the same thing each time we click.

The bufferTime operator buffers the emitted data of the originating Observable for a specific time period, then the buffered data will be emitted as an array.

bufferToggle buffers the source Observable values starting from the emission of openings and ending when the output of the closingSelector function emits.

The bufferWhen operator buffers data from the source Observable until the closingSelector function emits its data.

Finally the concatMap operator takes each source value of the originating Observable and waits for one value of the originating Observable to emit before the next emitted value from it emits.

By John Au-Yeung

Web developer specializing in React, Vue, and front end development.

Leave a Reply

Your email address will not be published. Required fields are marked *