Rxjs is a library for doing reactive programming. Creation operators are useful for generating data from various data sources to be subscribed to by Observers.
In this article, we’ll look at some utility operators that help us do various things, including the materialize
, observeOn
, subscribeOn
, timeInterval
and timeStamp
operators.
materialize
The materialize
operator maps all the emitted values from the source Observable into Notification
objects.
It takes no arguments and returns an Observable that emits Notification
objects with the values from the source Observable as values of the value
property of the Notification
object.
For example, we can use it as follows:
import { of } from "rxjs";
import { materialize } from "rxjs/operators";
const of$ = of(1, 2, 3);
const materialized = of$.pipe(materialize());
materialized.subscribe(x => console.log(x));
The code above takes the values of the of$
Observable, wrap each in a Notification
object and emit it.
Then we’ll get the following objects:
{
"kind": "N",
"value": 1,
"hasValue": true
}
{
"kind": "N",
"value": 2,
"hasValue": true
}
{
"kind": "N",
"value": 3,
"hasValue": true
}
{
"kind": "C",
"hasValue": false
}
It’ll emit error values when encountered:
import { of } from "rxjs";
import { materialize, map } from "rxjs/operators";
const of$ = of(1, 2, 3);
const upperCase = of$.pipe(map(x => x.toUpperCase()));
const materialized = upperCase.pipe(materialize());
materialized.subscribe(x => console.log(x));
Then we get:
{
"kind": "E",
"error": {},
"hasValue": false
}
since x
has no toUpperCase
method as they’re numbers emitted by the source Observable.
observeOn
The observeOn
operator returns an Observable that re-emits all notifications from the source Observable with the specified scheduler.
It takes up to 2 arguments. The first is the scheduler
that we’ll use to reschedule notifications from the source Observable.
The second argument is the optional delay
argument, which defaults to 0. It’s the number of milliseconds that represents the delay for every notification that’s rescheduled.
We shouldn’t use observeOn
to emit values from source Observables that emits lots of values synchronously. The delay
operator is better for delaying the emission of values from the source Observable.
For example, we can use it as follows:
import { interval, animationFrameScheduler } from "rxjs";
import { observeOn, take } from "rxjs/operators";
const intervals = interval(10).pipe(take(10));
const animationInterval = intervals.pipe(observeOn(animationFrameScheduler));
animationInterval.subscribe(x => console.log(x));
We piped
the emitted values from the intervals
Observable to the animationFrameScheduler
, which helps with smoothing out changes for animations.
subscribeOn
The subscribeOn
operator returns an Observable that asynchronously subscribes to Observers to the source Observable.
It takes up to 2 arguments. The first is the scheduler
to perform the subscription action on, and the second is an optional delay
argument, which defaults to 0.
For example, we can use it as follows:
import { of, merge, asyncScheduler } from "rxjs";
import { subscribeOn } from "rxjs/operators";
const a = of("a", "b", "c").pipe(subscribeOn(asyncScheduler));
const b = of(5, 6, 7, 8, 9);
merge(a, b).subscribe(console.log);
Then we get:
5
6
7
8
9
a
b
c
from the console.log
since a
is converted to an asynchronous Observable by the subscribeOn(asyncScheduler)
, while b
still emits values synchronously.
Asynchronous emissions are queued after synchronous ones. Therefore, the values are emitted later than the synchronous ones.
timeInterval
The timeInterval
returns an Observable that emits an object containing the current value and the time that’s passed between emitting the current value and the previous value. The time difference is calculated with the scheduler
‘s now()
method to get the current time for each emission.
The scheduler
defaults to async
, so by default, the time is measured in milliseconds.
It takes one optional argument, which is the scheduler
, which is used to get the current time.
For example, we can use it as follows:
import { interval } from "rxjs";
import { timeInterval, take } from "rxjs/operators";
const a = interval(1000).pipe(take(3));
a.pipe(timeInterval()).subscribe(
value => console.log(value),
err => console.log(err)
);
The a
Observable’s emitted values are pipe
d to the timeInterval
operator which emits an object with the value from the source Observable along with the time interval between the current and previous emitted value.
Then we get:
{
"value": 0,
"interval": 1001
}
{
"value": 1,
"interval": 1000
}
{
"value": 2,
"interval": 999
}
from the console.log
.
timestamp
The timestamp
operator returns an Observable that adds a timestamp to each item emitted by the source Observable to indicate when it was emitted.
It takes an optional scheduler
argument which defaults to async
.
For example, we can use it as follows:
import { interval } from "rxjs";
import { timestamp, take } from "rxjs/operators";
const a = interval(1000).pipe(take(3));
a.pipe(timestamp()).subscribe(
value => console.log(value),
err => console.log(err)
);
The code above takes the items emitted by the a
Observable then pipe
it to the timestamp
operator, which returns an object that has the value emitted by the source Observable along with the timestamp
of when it’s emitted.
Then we get:
{
"value": 0,
"timestamp": 1576632836715
}
{
"value": 1,
"timestamp": 1576632837715
}
{
"value": 2,
"timestamp": 1576632838715
}
The materialize
operator maps all the emitted values from the source Observable into Notification
objects.
observeOn
returns an Observable that re-emits all notifications from the source Observable with the specified scheduler.
subscribeOn
returns an Observable that asynchronously subscribes to Observers to the source Observable.
timeInterval
returns an Observable that emits an object containing the current value and the time that’s passed between emitting the current value and the previous value.
timestamp
returns an Observable that adds a timestamp to each item emitted by the source Observable of when it was emitted.