Rxjs is a library for doing reactive programming. Creation operators are useful for generating data from various data sources to be subscribed to by Observers.
In this article, we’ll look at some join creation operators to combine data from multiple Observables into one Observable. We’ll look at the merge
, race
and zip
join creation operators, and also the buffer
and bufferCount
transformation operators.
Join Creation Operators
These operators combine the values emitted from multiple Observers into one.
merge
The merge
operator takes multiple Observables and concurrently emits all values from every given input Observable.
It takes one array of Observables or a comma-separated list of Observables as arguments.
For example, we can use it as follows:
import { merge, of } from "rxjs";
const observable1 = of(1, 2, 3);
const observable2 = of(4, 5, 6);
const combined = merge(observable1, observable2);
combined.subscribe(x => console.log(x));
Another example would be combining multiple timed Observables as follows:
import { merge, interval } from "rxjs";
const observable1 = interval(1000);
const observable2 = interval(2000);
const combined = merge(observable1, observable2);
combined.subscribe(x => console.log(x));
We’ll see that the first observable1
will emit a value first, then observable2
. Then observable1
will continue to emit values every second, and observable2
will emit values every 2 seconds.
race
The race
operator takes multiple Observables and returns the Observable that emits an item from the arguments.
It takes a comma-separated list of Observables as arguments.
For example, we can use it as follows:
import { race, of } from "rxjs";
const observable1 = of(1, 2, 3);
const observable2 = of(4, 5, 6);
const combined = race(observable1, observable2);
combined.subscribe(x => console.log(x));
We have observable1
, which emits data before observable2
. We should get the output:
1
2
3
since observable
emits values first.
zip
The zip
operator combines multiple Observables and returns an Observable whose values are calculated from the values, in order of each of its input Observables.
It takes a list of Observables as arguments. We can use it as follows:
import { zip, of } from "rxjs";
const observable1 = of(1, 2, 3);
const observable2 = of(4, 5, 6);
const combined = zip(observable1, observable2);
combined.subscribe(x => console.log(x));
Then we get the following:
[1, 4]
[2, 5]
[3, 6]
We can also map them to objects as follows to make values from one Observable easier to distinguish from the other.
To do this, we can write the following:
import { zip, of } from "rxjs";
import { map } from "rxjs/operators";
const age$ = of(1, 2, 3);
const name$ = of("John", "Mary", "Jane");
const combined = zip(age$, name$);
combined
.pipe(map(([age, name]) => ({ age, name })))
.subscribe(x => console.log(x));
Transformation Operators
buffer
The buffer
operator buffers the source Observable values until the closingNotifier
emits.
It takes one argument, which is the closingNotifier
. It’s an Observable that signals the buffer to be emitted on the output Observable.
For example, we can use it as follows:
import { fromEvent, timer } from "rxjs";
import { buffer } from "rxjs/operators";
const observable = timer(1000, 1000);
const clicks = fromEvent(document, "click");
const buffered = observable.pipe(buffer(clicks));
buffered.subscribe(x => console.log(x));
In the code above, we have an Observable created by the timer
operator which emits numbers every second after 1 second of waiting. Then we pipe our results into the clicks
Observable, which emits as clicks are made to the document.
This means that as we click the page, the emitted data that are buffered by the buffer
operator will emit the data that was buffered. Also, this means that as we click our document, we’ll get anything from an empty array to an array of values that were emitted between clicks.
bufferCount
bufferCount
is slightly different from buffer
in that it buffers the data until the size hits the maximum bufferSize
.
It takes 2 arguments, which are the bufferSize
, which is the maximum size buffered, and the startBufferEvery
parameter which is an optional parameter indicating the interval at which to start a new buffer.
For example, we can use it as follows:
import { fromEvent } from "rxjs";
import { bufferCount } from "rxjs/operators";
const clicks = fromEvent(document, "click");
const buffered = clicks.pipe(bufferCount(10));
buffered.subscribe(x => console.log(x));
The code above will emit the MouseEvent
objects that are buffered into the array once we clicked 10 times since this is when we 10 MouseEvent
objects are emitted by the originating Observable.
As we can see, the join creation operators lets us combine Observables’ emitted data in many ways. We can pick the first ones emitted, we can combine all the emitted data into one, and we can get them concurrently.
Also, we can buffer Observable’s emitted data and emit them when a given amount is buffered or a triggering event will emit the data in the buffer.