Rxjs is a library for doing reactive programming. Creation operators are useful for generating data from various data sources to be subscribed to by Observers.
In this article, we’ll look at some transformation operators like mergeScan
, pairwise
, partition
, and pluck
operators.
mergeScan
The mergeScan
operator applies an accumulator function over the source Observable where the accumulator function itself returns an Observable, where each intermediate Observable returned is merged into the output Observable.
It takes up to 3 arguments. The first is an accumulator function that’s called on each source value.
The second argument is the seed
parameter, which takes the initial accumulation value.
The last argument is an optional argument which takes the maximum number of concurrent
input Observables to be subscribed to concurrently.
For example, we can use it as follows:
import { of, interval } from "rxjs";
import { mapTo, mergeScan } from "rxjs/operators";
const interval$ = interval(5000);
const one$ = interval$.pipe(mapTo(1));
const seed = 0;
const count$ = one$.pipe(mergeScan((acc, one) => of(acc + one), seed));
count$.subscribe(x => console.log(x));
The code above has the interval$
Observable which is mapped to the value 1 with the mapTo
operator. Then we set the seed
initial value to 0.
Then we send the emitted values of the interval$
Observable to the accumulator function via the mergeScan
operator, which keeps adding 1 as the values from interval$
is emitted. Then we get the emitted number from $count
when we subscribe to count$
.
pairwise
pairwise
groups pairs of consecutive emissions from the source Observable together then emit them as an array 2 values.
It takes on arguments.
For example, we can use it as in the following example:
import { of } from "rxjs";
import { pairwise } from "rxjs/operators";
const of$ = of(1, 2, 3, 4, 5, 6);
const pair$ = of$.pipe(pairwise());
pair$.subscribe(x => console.log(x));
The code above will group the emitted values from the of$
Observable into pairs with the pairwise()
operator then emitted the values. Then we get the following output from the console.log
:
[1, 2]
[2, 3]
[3, 4]
[4, 5]
[5, 6]
partition
The partition
operator splits the source Observable into 2, where one has the values that satisfy a predicate and the other with values that doesn’t.
It takes up to 2 arguments. The first is the predicate
, which is a function that evaluates each value emitted by the source Observable. Then if the function returns true
, then the value emitted on the first return Observable in the emitted array. Otherwise, it’s emitted in the second Observable with that emitted array.
The second is the thisArg
, which lets us set the value of this
inside the predicate function.
For instance, we can use it as follows:
import { of } from "rxjs";
import { partition } from "rxjs/operators";
const of$ = of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
const parts = of$.pipe(partition(x => x % 2 === 0));
const [even, odd] = parts;
odd.subscribe(x => console.log(`odd numbers: ${x}`));
even.subscribe(x => console.log(`even numbers: ${x}`));
In the code above, we have the of$
Observable with some numbers. Then we pipe
it into the partition
operator, which takes a predicate function that checks whether a number emitted from the of$
Observable is even.
parts
will be set to the 2 Observables returned from the partition
operator. The even
Observable has the even
numbers emitted, and the odd
Observable has odd numbers emitted.
Then we get the following output from the odd
Observable:
odd numbers: 1
odd numbers: 3
odd numbers: 5
odd numbers: 7
odd numbers: 9
Then we get the following output from the even
Observable:
even numbers: 2
even numbers: 4
even numbers: 6
even numbers: 8
even numbers: 10
pluck
The pluck
operator map each emitted value from the source Observable to its specified nested property.
It takes one argument which is the properties
object, which are the nested properties to pluck from each source value.
For example, we can use it as follows:
import { of } from "rxjs";
import { pluck } from "rxjs/operators";
const people = [
{ name: { firstName: "Mary" }, age: 10, gender: "female" },
{ name: { firstName: "Joe" }, age: 11, gender: "male" },
{ name: { firstName: "Amy" }, age: 10, gender: "female" }
];
const people$ = of(...people);
const plucked = people$.pipe(pluck("name", "firstName"));
plucked.subscribe(x => console.log(x));
The code above, has a people
array where each entry has a name
object. We emit the values with the of
operator and then pipe
the emitted values and use pluck
to get the nested property firstName
, which is inside name
.
Then we should get:
Mary
Joe
Amy
as the output on the last line.
mergeScan
applies an accumulator function over the source Observable where the accumulator function itself returns an Observable. Then each intermediate Observable returned is merged into the output Observable and emitted together.
pairwise
groups pairs of consecutive emissions from the source Observable together then emit them as an array 2 values.
The partition
operator splits the source Observable into 2, where one has the values that satisfy the condition returned in the predicate function and the other with values that doesn’t.
The pluck
operator map each emitted value from the source Observable the value of the specified nested property.