RxJS is a library for reactive programming. Creating operators are useful for generating data from various data sources to be subscribed to by Observers.
In this article, we’ll look at some RxJS transformation operators like bufferTime
, bufferToggle
, bufferWhen
, and concatMap
operators.
bufferTime
The bufferTime
operator buffers the emitted data of the originating Observable for a specific time period.
It takes one argument, the bufferTimeSpan
, which is the amount of time to fill each buffer array. The unit for the time span is milliseconds.
The buffer data is emitted and the buffer is reset once the amount of time specified is up.
It also takes a bufferCreationInterval
argument which specifies the time span for which the buffer data builds up. The operator opens the buffer every bufferCreationInterval
milliseconds and emits and resets every bufferTimeSpan
milliseconds.
Another optional argument for this operator is the maxBufferSize
, which is a number that specifies the maximum size of items buffered.
For example, we can use it as follows:
import { interval } from "rxjs";
import { bufferTime } from "rxjs/operators";
const clicks = interval(2000);
const buffered = clicks.pipe(bufferTime(1000));
buffered.subscribe(x => console.log(x));
We should see a new number emitted every 2 seconds in the arrays that are emitted, while the rest of the emitted arrays are empty.
The bufferCreationInterval
argument can be used as follows:
import { interval } from "rxjs";
import { bufferTime } from "rxjs/operators";
const clicks = interval(2000);
const buffered = clicks.pipe(bufferTime(1000, 1000));
buffered.subscribe(x => console.log(x));
In the code above, the buffered data is emitted every second, and the buffer is created every second.
bufferToggle
bufferToggle
buffers the source Observable values starting from the emission of openings
and ending when the output of the closingSelector
emits.
Values emitted by the originating Observable is buffered until the closingSelector
tells us to stop emitting values from the originating Observable.
For example, if we have a button as follows:
<button>Click Me</button>
We can buffer the mouse click events on a button and the emit the MouseEvent
objects that were buffered according to the closingSelector
function’s specification as follows:
import { fromEvent, interval, EMPTY } from "rxjs";
import { bufferToggle } from "rxjs/operators";
const clicks = fromEvent(document.querySelector("button"), "click");
const openings = interval(1000);
const buffered = clicks.pipe(
bufferToggle(openings, i => (i % 2 ? interval(1500) : EMPTY))
);
buffered.subscribe(x => console.log(x));
The closingSelector
in the example above is:
(i % 2 ? interval(1500) : EMPTY)
The code will emit data from the openings
Observable when it starts emitting, which is when we click on our button, ever 1.5 seconds. The emitted data is buffered into an array and then the buffering ends when i % 2
is false
, and the interval(1500)
Observable is returned signaling closure. It’ll continue buffer when EMPTY
is emitted.
bufferWhen
The bufferWhen
operator buffers data from the source Observable until the closingSelector
function closes the buffer.
It takes one argument, which is the closingSelector
function to specify when the buffer will be closed.
For example, we can use it as we do in the following code:
import { fromEvent, interval } from "rxjs";
import { bufferWhen } from "rxjs/operators";
const clicks = fromEvent(document.querySelector("button"), "click");
const buffered = clicks.pipe(
bufferWhen(() => interval(1000 + Math.random() * 4000))
);
buffered.subscribe(x => console.log(x));
What we did is get the button clicks and then emit the MouseEvent
object array of the MouseEvent
s from the clicks that are buffered.
Once we did, then the closingSelector
specifies that we emit the buffered values every 1000 + Math.random() * 4000
milliseconds and empty the buffer and buffer the click events again.
concatMap
The concatMap
operator takes each source value of the originating Observable and wait for one value of the originating Observable to emit before the next emitted value from it emits.
It takes 2 arguments. The first is a project
function, which is a function that returns a new Observable with operations that we want to be applied to the values emitted from the originating Observable. The second argument is the resultSelector
. It’s an optional argument, which is a function to pick the emitted values that we want to emit in the returned Observable.
For example, we can use it as follows:
import { fromEvent, interval, of } from "rxjs";
import { concatMap, take } from "rxjs/operators";
const clicks = fromEvent(document, "click");
const result = clicks.pipe(concatMap(ev => interval(1000).pipe(take(5))));
result.subscribe(x => console.log(x));
The code above will receive the click events, then after 1 second emit 0, then after another second emit 1, up until it reaches 4.
It’ll do the same thing each time we click.
The bufferTime
operator buffers the emitted data of the originating Observable for a specific time period, then the buffered data will be emitted as an array.
bufferToggle
buffers the source Observable values starting from the emission of openings
and ending when the output of the closingSelector
function emits.
The bufferWhen
operator buffers data from the source Observable until the closingSelector
function emits its data.
Finally the concatMap
operator takes each source value of the originating Observable and waits for one value of the originating Observable to emit before the next emitted value from it emits.