Categories
JavaScript Rxjs

Rxjs Operators — More Utility Operators

Spread the love

Rxjs is a library for doing reactive programming. Creation operators are useful for generating data from various data sources to be subscribed to by Observers.

In this article, we’ll look at some utility operators that help us do various things, including the materialize , observeOn , subscribeOn , timeInterval and timeStamp operators.

materialize

The materialize operator maps all the emitted values from the source Observable into Notification objects.

It takes no arguments and returns an Observable that emits Notification objects with the values from the source Observable as values of the value property of the Notification object.

For example, we can use it as follows:

import { of } from "rxjs";  
import { materialize } from "rxjs/operators";

const of$ = of(1, 2, 3);  
const materialized = of$.pipe(materialize());  
materialized.subscribe(x => console.log(x));

The code above takes the values of the of$ Observable, wrap each in a Notification object and emit it.

Then we’ll get the following objects:

{  
  "kind": "N",  
  "value": 1,  
  "hasValue": true  
}  
{  
  "kind": "N",  
  "value": 2,  
  "hasValue": true  
}  
{  
  "kind": "N",  
  "value": 3,  
  "hasValue": true  
}  
{  
  "kind": "C",  
  "hasValue": false  
}

It’ll emit error values when encountered:

import { of } from "rxjs";  
import { materialize, map } from "rxjs/operators";

const of$ = of(1, 2, 3);  
const upperCase = of$.pipe(map(x => x.toUpperCase()));  
const materialized = upperCase.pipe(materialize());  
materialized.subscribe(x => console.log(x));

Then we get:

{  
  "kind": "E",  
  "error": {},  
  "hasValue": false  
}

since x has no toUpperCase method as they’re numbers emitted by the source Observable.

observeOn

The observeOn operator returns an Observable that re-emits all notifications from the source Observable with the specified scheduler.

It takes up to 2 arguments. The first is the scheduler that we’ll use to reschedule notifications from the source Observable.

The second argument is the optional delay argument, which defaults to 0. It’s the number of milliseconds that represents the delay for every notification that’s rescheduled.

We shouldn’t use observeOn to emit values from source Observables that emits lots of values synchronously. The delay operator is better for delaying the emission of values from the source Observable.

For example, we can use it as follows:

import { interval, animationFrameScheduler } from "rxjs";  
import { observeOn, take } from "rxjs/operators";
const intervals = interval(10).pipe(take(10));  
const animationInterval = intervals.pipe(observeOn(animationFrameScheduler));  
animationInterval.subscribe(x => console.log(x));

We piped the emitted values from the intervals Observable to the animationFrameScheduler , which helps with smoothing out changes for animations.

subscribeOn

The subscribeOn operator returns an Observable that asynchronously subscribes to Observers to the source Observable.

It takes up to 2 arguments. The first is the scheduler to perform the subscription action on, and the second is an optional delay argument, which defaults to 0.

For example, we can use it as follows:

import { of, merge, asyncScheduler } from "rxjs";  
import { subscribeOn } from "rxjs/operators";

const a = of("a", "b", "c").pipe(subscribeOn(asyncScheduler));  
const b = of(5, 6, 7, 8, 9);  
merge(a, b).subscribe(console.log);

Then we get:

5  
6  
7  
8  
9  
a  
b  
c

from the console.log since a is converted to an asynchronous Observable by the subscribeOn(asyncScheduler) , while b still emits values synchronously.

Asynchronous emissions are queued after synchronous ones. Therefore, the values are emitted later than the synchronous ones.

timeInterval

The timeInterval returns an Observable that emits an object containing the current value and the time that’s passed between emitting the current value and the previous value. The time difference is calculated with the scheduler ‘s now() method to get the current time for each emission.

The scheduler defaults to async , so by default, the time is measured in milliseconds.

It takes one optional argument, which is the scheduler , which is used to get the current time.

For example, we can use it as follows:

import { interval } from "rxjs";  
import { timeInterval, take } from "rxjs/operators";

const a = interval(1000).pipe(take(3));  
a.pipe(timeInterval()).subscribe(  
  value => console.log(value),  
  err => console.log(err)  
);

The a Observable’s emitted values are pipe d to the timeInterval operator which emits an object with the value from the source Observable along with the time interval between the current and previous emitted value.

Then we get:

{  
  "value": 0,  
  "interval": 1001  
}  
{  
  "value": 1,  
  "interval": 1000  
}  
{  
  "value": 2,  
  "interval": 999  
}

from the console.log .

timestamp

The timestamp operator returns an Observable that adds a timestamp to each item emitted by the source Observable to indicate when it was emitted.

It takes an optional scheduler argument which defaults to async .

For example, we can use it as follows:

import { interval } from "rxjs";  
import { timestamp, take } from "rxjs/operators";

const a = interval(1000).pipe(take(3));  
a.pipe(timestamp()).subscribe(  
  value => console.log(value),  
  err => console.log(err)  
);

The code above takes the items emitted by the a Observable then pipe it to the timestamp operator, which returns an object that has the value emitted by the source Observable along with the timestamp of when it’s emitted.

Then we get:

{  
  "value": 0,  
  "timestamp": 1576632836715  
}  
{  
  "value": 1,  
  "timestamp": 1576632837715  
}  
{  
  "value": 2,  
  "timestamp": 1576632838715  
}

The materialize operator maps all the emitted values from the source Observable into Notification objects.

observeOn returns an Observable that re-emits all notifications from the source Observable with the specified scheduler.

subscribeOn returns an Observable that asynchronously subscribes to Observers to the source Observable.

timeInterval returns an Observable that emits an object containing the current value and the time that’s passed between emitting the current value and the previous value.

timestamp returns an Observable that adds a timestamp to each item emitted by the source Observable of when it was emitted.

By John Au-Yeung

Web developer specializing in React, Vue, and front end development.

Leave a Reply

Your email address will not be published. Required fields are marked *