Rxjs is a library for doing reactive programming. Creation operators are useful for generating data from various data sources to be subscribed to by Observers.
In this article, we’ll look at how to use transformation operators scan
, switchMap
, switchMapTo
and window
.
scan
The scan
operator applies an accumulator function over the source Observation by combining the emitted values. Then it returns each intermediate result, with an optional seed value.
It takes up to 2 arguments. The first is the accumulator
function, which is the function that combines the value that was accumulated so far with the new emitted value.
The second is an optional argument, which is the seed
, or the initial accumulation value.
For example, we can use it as follows:
import { of } from "rxjs";
import { scan } from "rxjs/operators";
const of$ = of(1, 2, 3);
const seed = 0;
const count = of$.pipe(scan((acc, val) => acc + val, seed));
count.subscribe(x => console.log(x));
The code above starts with an of
Observable and a seed
initial value of 0. Then we pipe
the values from the of$
Observable to the scan
operator, which has the accumulation function, which adds the accumulated value to the newly emitted value. We also set seed
to 0.
Then we subscribe to the count
Observable that’s results from this.
switchMap
The switchMap
operator projects each source value to an Observable which is then merged into one output Observable. Only the values from the most recently projected Observable is emitted.
It takes up to 2 arguments. The first is a project
function which takes the emitted value of the source Observable as a parameter and then returns a new Observable from it.
The second is an optional resultSelector
argument. It’s a function that lets us select the result from the emitted values of the new Observable.
We can use it as follows:
import { of } from "rxjs";
import { switchMap } from "rxjs/operators";
const switched = of(1, 2, 3).pipe(switchMap(x => of(x, x * 2, x * 3)));
switched.subscribe(x => console.log(x));
The code above takes the values emitted from the of(1, 2, 3)
Observable and then pipe
the result into the switchMap
operator, which has a function which maps the value emitted from the of(1, 2, 3)
Observable and return of(x, x * 2, x * 3)
, where x
is 1, 2 or 3 from the of(1, 2, 3)
Observable.
This means that we get 1
, 1*2
and 1*3
which are 1, 2 and 3. Then the same is done with 2, so we get 2
, 2*2
and 2*3
, which are 2, 4 and 6. Finally, we get 3
, 3*2
, and 3*3
, which 3, 6 and 9.
So we get the following output:
1
2
3
2
4
6
3
6
9
switchMapTo
The switchMapTo
operator projects each source value to the same Observable. The Observable then is flatten multiple times with switchMap
in the output Observable.
It takes up to 2 arguments. The first is an Observable which we replace each emitted value of the source Observable with.
The second is an optional argument, which is the resultSelector
function which lets us select the value from the new Observable.
For example, we can use it as follows:
import { of, interval } from "rxjs";
import { switchMapTo, take } from "rxjs/operators";
const switched = of(1, 2, 3).pipe(switchMapTo(interval(1000).pipe(take(3))));
switched.subscribe(x => console.log(x));
The code above will map the emitted values of the of(1, 2, 3)
Observable into the interval(1000).pipe(take(3))
which emits values from 0 to 3 spaced 1 second apart.
The result will be that we get 0, 1, 2, and 3 as the output.
window
The window
operator branch out the source Observable values as a nested Observable when the windowBoundaries
Observable emits.
It takes one argument, which is the windowBoundaries
Observable that’s used to complete the previous window and starts a new window.
Like buffer
it buffers the emitted values then emits them all at once some condition is met but emits an Observable instead of an array.
For instance, we can use it as follows:
import { interval, timer } from "rxjs";
import { window, mergeAll, map, take } from "rxjs/operators";
const timer$ = timer(3000, 1000);
const sec = interval(6000);
const result = timer$.pipe(
window(sec),
map(win => win.pipe(take(2))),
mergeAll()
);
result.subscribe(x => console.log(x));
The code above has the timer(3000, 1000)
Observable which emits values every second starting 3 seconds after it’s been initialized.
We also have a sec
Observable that emits numbers every 6 seconds. We use that for windowing with the window
operator. This means that value from the timer$
Observable will emit values, which will then be pipe
d to the window
operator.
This will then be piped to the map
operator, which will take the first 2 values that were emitted from the window
operator. Then all the results will be merged together with mergeAll
.
In the end, we get 2 numbers from timer$
emitted every 6 seconds.
The scan
operator applies an accumulator function over the source Observation by combining the emitted values. Then it returns each intermediate result, with an optional seed value.
switchMap
projects each source value to an Observable which is then merged into one output Observable. Only the values from the most recently projected Observable are emitted.
Like switchMap
,switchMapTo
projects each source value an Observable. But unlike switchMap
, it projects to the same Observable. The Observable then is flatten multiple times with switchMap
in the output Observable.
The window
operator branch out the source Observable values as a nested Observable when the windowBoundaries
Observable, which is used for closing and opening the window emits.