Categories
JavaScript Rxjs

More Rxjs Operators

Spread the love

Rxjs is a library for doing reactive programming. Creation operators are useful for generating data from various data sources to be subscribed to by Observers.

In this article, we’ll look at some join creation operators to combine data from multiple Observables into one Observable. We’ll look at the merge , race and zip join creation operators, and also the buffer and bufferCount transformation operators.

Join Creation Operators

These operators combine the values emitted from multiple Observers into one.

merge

The merge operator takes multiple Observables and concurrently emits all values from every given input Observable.

It takes one array of Observables or a comma-separated list of Observables as arguments.

For example, we can use it as follows:

import { merge, of } from "rxjs";
const observable1 = of(1, 2, 3);  
const observable2 = of(4, 5, 6);  
const combined = merge(observable1, observable2);  
combined.subscribe(x => console.log(x));

Another example would be combining multiple timed Observables as follows:

import { merge, interval } from "rxjs";
const observable1 = interval(1000);  
const observable2 = interval(2000);  
const combined = merge(observable1, observable2);  
combined.subscribe(x => console.log(x));

We’ll see that the first observable1 will emit a value first, then observable2 . Then observable1 will continue to emit values every second, and observable2 will emit values every 2 seconds.

race

The race operator takes multiple Observables and returns the Observable that emits an item from the arguments.

It takes a comma-separated list of Observables as arguments.

For example, we can use it as follows:

import { race, of } from "rxjs";
const observable1 = of(1, 2, 3);  
const observable2 = of(4, 5, 6);  
const combined = race(observable1, observable2);  
combined.subscribe(x => console.log(x));

We have observable1 , which emits data before observable2 . We should get the output:

1  
2  
3

since observable emits values first.

zip

The zip operator combines multiple Observables and returns an Observable whose values are calculated from the values, in order of each of its input Observables.

It takes a list of Observables as arguments. We can use it as follows:

import { zip, of } from "rxjs";
const observable1 = of(1, 2, 3);  
const observable2 = of(4, 5, 6);  
const combined = zip(observable1, observable2);  
combined.subscribe(x => console.log(x));

Then we get the following:

[1, 4]  
[2, 5]  
[3, 6]

We can also map them to objects as follows to make values from one Observable easier to distinguish from the other.

To do this, we can write the following:

import { zip, of } from "rxjs";  
import { map } from "rxjs/operators";
const age$ = of(1, 2, 3);  
const name$ = of("John", "Mary", "Jane");  
const combined = zip(age$, name$);  
combined  
  .pipe(map(([age, name]) => ({ age, name })))  
  .subscribe(x => console.log(x));

Transformation Operators

buffer

The buffer operator buffers the source Observable values until the closingNotifier emits.

It takes one argument, which is the closingNotifier . It’s an Observable that signals the buffer to be emitted on the output Observable.

For example, we can use it as follows:

import { fromEvent, timer } from "rxjs";  
import { buffer } from "rxjs/operators";
const observable = timer(1000, 1000);  
const clicks = fromEvent(document, "click");  
const buffered = observable.pipe(buffer(clicks));  
buffered.subscribe(x => console.log(x));

In the code above, we have an Observable created by the timer operator which emits numbers every second after 1 second of waiting. Then we pipe our results into the clicks Observable, which emits as clicks are made to the document.

This means that as we click the page, the emitted data that are buffered by the buffer operator will emit the data that was buffered. Also, this means that as we click our document, we’ll get anything from an empty array to an array of values that were emitted between clicks.

bufferCount

bufferCount is slightly different from buffer in that it buffers the data until the size hits the maximum bufferSize .

It takes 2 arguments, which are the bufferSize , which is the maximum size buffered, and the startBufferEvery parameter which is an optional parameter indicating the interval at which to start a new buffer.

For example, we can use it as follows:

import { fromEvent } from "rxjs";  
import { bufferCount } from "rxjs/operators";  
const clicks = fromEvent(document, "click");  
const buffered = clicks.pipe(bufferCount(10));  
buffered.subscribe(x => console.log(x));

The code above will emit the MouseEvent objects that are buffered into the array once we clicked 10 times since this is when we 10 MouseEvent objects are emitted by the originating Observable.

As we can see, the join creation operators lets us combine Observables’ emitted data in many ways. We can pick the first ones emitted, we can combine all the emitted data into one, and we can get them concurrently.

Also, we can buffer Observable’s emitted data and emit them when a given amount is buffered or a triggering event will emit the data in the buffer.

By John Au-Yeung

Web developer specializing in React, Vue, and front end development.

Leave a Reply

Your email address will not be published. Required fields are marked *