Categories
JavaScript Rxjs

More Rxjs Transformation Operators — Scan and Window

Spread the love

Rxjs is a library for doing reactive programming. Creation operators are useful for generating data from various data sources to be subscribed to by Observers.

In this article, we’ll look at how to use transformation operators scan , switchMap , switchMapTo and window .

scan

The scan operator applies an accumulator function over the source Observation by combining the emitted values. Then it returns each intermediate result, with an optional seed value.

It takes up to 2 arguments. The first is the accumulator function, which is the function that combines the value that was accumulated so far with the new emitted value.

The second is an optional argument, which is the seed , or the initial accumulation value.

For example, we can use it as follows:

import { of } from "rxjs";  
import { scan } from "rxjs/operators";
const of$ = of(1, 2, 3);  
const seed = 0;  
const count = of$.pipe(scan((acc, val) => acc + val, seed));  
count.subscribe(x => console.log(x));

The code above starts with an of Observable and a seed initial value of 0. Then we pipe the values from the of$ Observable to the scan operator, which has the accumulation function, which adds the accumulated value to the newly emitted value. We also set seed to 0.

Then we subscribe to the count Observable that’s results from this.

switchMap

The switchMap operator projects each source value to an Observable which is then merged into one output Observable. Only the values from the most recently projected Observable is emitted.

It takes up to 2 arguments. The first is a project function which takes the emitted value of the source Observable as a parameter and then returns a new Observable from it.

The second is an optional resultSelector argument. It’s a function that lets us select the result from the emitted values of the new Observable.

We can use it as follows:

import { of } from "rxjs";  
import { switchMap } from "rxjs/operators";
const switched = of(1, 2, 3).pipe(switchMap(x => of(x, x * 2, x * 3)));  
switched.subscribe(x => console.log(x));

The code above takes the values emitted from the of(1, 2, 3) Observable and then pipe the result into the switchMap operator, which has a function which maps the value emitted from the of(1, 2, 3) Observable and return of(x, x * 2, x * 3) , where x is 1, 2 or 3 from the of(1, 2, 3) Observable.

This means that we get 1, 1*2 and 1*3 which are 1, 2 and 3. Then the same is done with 2, so we get 2 , 2*2 and 2*3 , which are 2, 4 and 6. Finally, we get 3 , 3*2 , and 3*3 , which 3, 6 and 9.

So we get the following output:

1  
2  
3  
2  
4  
6  
3  
6  
9

switchMapTo

The switchMapTo operator projects each source value to the same Observable. The Observable then is flatten multiple times with switchMap in the output Observable.

It takes up to 2 arguments. The first is an Observable which we replace each emitted value of the source Observable with.

The second is an optional argument, which is the resultSelector function which lets us select the value from the new Observable.

For example, we can use it as follows:

import { of, interval } from "rxjs";  
import { switchMapTo, take } from "rxjs/operators";
const switched = of(1, 2, 3).pipe(switchMapTo(interval(1000).pipe(take(3))));  
switched.subscribe(x => console.log(x));

The code above will map the emitted values of the of(1, 2, 3) Observable into the interval(1000).pipe(take(3)) which emits values from 0 to 3 spaced 1 second apart.

The result will be that we get 0, 1, 2, and 3 as the output.

window

The window operator branch out the source Observable values as a nested Observable when the windowBoundaries Observable emits.

It takes one argument, which is the windowBoundaries Observable that’s used to complete the previous window and starts a new window.

Like buffer it buffers the emitted values then emits them all at once some condition is met but emits an Observable instead of an array.

For instance, we can use it as follows:

import { interval, timer } from "rxjs";  
import { window, mergeAll, map, take } from "rxjs/operators";
const timer$ = timer(3000, 1000);  
const sec = interval(6000);  
const result = timer$.pipe(  
  window(sec),  
  map(win => win.pipe(take(2))),  
  mergeAll()  
);  
result.subscribe(x => console.log(x));

The code above has the timer(3000, 1000) Observable which emits values every second starting 3 seconds after it’s been initialized.

We also have a sec Observable that emits numbers every 6 seconds. We use that for windowing with the window operator. This means that value from the timer$ Observable will emit values, which will then be pipe d to the window operator.

This will then be piped to the map operator, which will take the first 2 values that were emitted from the window operator. Then all the results will be merged together with mergeAll .

In the end, we get 2 numbers from timer$ emitted every 6 seconds.

The scan operator applies an accumulator function over the source Observation by combining the emitted values. Then it returns each intermediate result, with an optional seed value.

switchMap projects each source value to an Observable which is then merged into one output Observable. Only the values from the most recently projected Observable are emitted.

Like switchMap,switchMapTo projects each source value an Observable. But unlike switchMap, it projects to the same Observable. The Observable then is flatten multiple times with switchMap in the output Observable.

The window operator branch out the source Observable values as a nested Observable when the windowBoundaries Observable, which is used for closing and opening the window emits.

Leave a Reply

Your email address will not be published. Required fields are marked *