Categories
JavaScript Rxjs

Rxjs Operators — Utility Operators

Spread the love

Rxjs is a library for doing reactive programming. Creation operators are useful for generating data from various data sources to be subscribed to by Observers.

In this article, we’ll look at some utility operators that help us do various things, including the tap , delay , delayWhen , and dematerialize operators.

tap

The tap operator returns an Observable that lets us perform some side effects for every emission on the source Observable and return the emit the same values from the source.

It takes 3 optional arguments. The first is a nextOrObserver object which is a normal Observable or callback for next . The second is an error callback, which is a callback for errors in the source. The last is the complete callback, which is called when the source Observable completes.

It lets us intercept the emission from the source do something during that time by running a function. Then it returns the same output as the source Observable.

For example, we can use it as follows:

import { of } from "rxjs";  
import { tap } from "rxjs/operators";  
const of$ = of(1, 2, 3);  
const result = of$.pipe(  
  tap(  
    val => console.log(`value: ${val}`),  
    error => console.log(error),  
    () => console.log("complete")  
  )  
);  
result.subscribe(x => console.log(x));

Then we should get:

value: 1  
1  
value: 2  
2  
value: 3  
3  
complete

since we logged the values emitted with:

val => console.log(`value: ${val}`)

And we logged 'complete' when the of$ Observable completes by writing:

() => console.log("complete")

delay

The delay operator returns an Observable that delays the emission of items from the source Observable by the specified delay value or until a given Date.

It takes up to 2 arguments. The first is the delay , which is the delay duration in milliseconds or a Date until which the emission of the source item is delayed.

The second argument is an optional scheduler , which defaults to async . It lets us change the timing mechanism for emitting the values by the returned Observable.

For example, we can use it as follows:

import { of } from "rxjs";  
import { delay } from "rxjs/operators";  
const of$ = of(1, 2, 3);  
const result = of$.pipe(delay(1000));  
result.subscribe(x => console.log(x));

For example, we can use it as follows to delay the emission of values from the of$ Observable by a second with delay(1000) .

We should get 1, 2, and 3 emitted after the delay.

Also, we can delay emission to a specific date as follows:

import { of } from "rxjs";  
import { delay } from "rxjs/operators";  
import moment from "moment";  
const of$ = of(1, 2, 3);  
const result = of$.pipe(  
  delay(  
    moment()  
      .add(5, "seconds")  
      .toDate()  
  )  
);  
result.subscribe(x => console.log(x));

The code above will delay the emission of values from of$ by 5 seconds from the current date and time.

The output should be unchanged.

delayWhen

The delayWhen operator returns an Observable that delays the emission from the source Observable by a given time span determined by the emission of another Observable.

It takes up to 2 arguments. The first is the delayDurationSelector function that returns an Observable for each value emitted by the source Observable. When the Observable returned by this function emits, then the value from the source Observable will emit.

The second argument is an optional argument for the subscriptionDelay . It’s an Observable that triggers the subscription to the source Observable once it emits any value.

For instance, we can use it as follows:

import { of, interval } from "rxjs";  
import { delayWhen } from "rxjs/operators";  
const of$ = of(1, 2, 3);  
const result = of$.pipe(delayWhen(val => interval(Math.random() * 5000)));  
result.subscribe(x => console.log(x));

The code above will delay the emission each value emitted by the of$ Observable by Math.random() * 5000 milliseconds. This means that each value will be emitted when interval(Math.random() * 5000) emits.

The result will be that 1, 2 and 3 will be emitted in an random order.

dematerialize

The dematerialize returns an Observable object that turns an Observable of Notification objects into emissions that they represent.

It takes no parameters.

For example, we can use it as follows:

import { of, Notification } from "rxjs";  
import { dematerialize } from "rxjs/operators";

const notifA = new Notification("N", 1);  
const notifB = new Notification("N", 2);  
const notifE = new Notification("E", undefined, new Error("error"));  
const materialized = of(notifA, notifB, notifE);  
const upperCase = materialized.pipe(dematerialize());  
upperCase.subscribe(x => console.log(x), e => console.error(e));

Then we should get:

1  
2  
Error: error

Since we converted the Notification objects’ values into the values that are emitted by the return Observable.

The second argument passed into the Notification constructor is the values that are emitted for the normal values emitted. The error is the third argument of the Notification constructor.

tap returns an Observable that lets us perform some side effects for every emission on the source Observable and then emit the same values from the source.

delay returns an Observable that delays the emission of items from the source Observable by the specified delay value or until a given Date.

delayWhen returns an Observable that delays the emission from the source Observable by a given time span which is determined by the emission of another Observable.

dematerialize returns an Observable object that emits the values set in the Notification objects emitted by the source Observable.

By John Au-Yeung

Web developer specializing in React, Vue, and front end development.

Leave a Reply

Your email address will not be published. Required fields are marked *