Categories
JavaScript Rxjs

More Rxjs Transformation Operators — Window

RxJS is a library for reactive programming. Creating operators are useful for generating data from various data sources to be subscribed to by Observers.

In this article, we’ll look at some window operators, including the windowCount, windowTime, windowToggle and windowWhen operators.

windowCount

The windowCount operator branch out the source Observable values as a nested Observable with each of them emitting at most windowSize events.

It takes up to 2 arguments. The first argument is the windowSize, which the maximum number of values to be emitted by each window.

The second argument is optional. It’s the startWindowEvery number, which defaults to 0. It’s the interval to start a new window. The interval is measured by the number of items emitted by the source Observable.

For example, we can use it as follows:

import { interval } from "rxjs";  
import { windowCount, mergeAll, take, map, skip } from "rxjs/operators";
const nums = interval(1000).pipe(take(1000));  
const result = nums.pipe(  
  windowCount(3, 3),  
  map(win => win.pipe(skip(1))),  
  mergeAll()  
);  
result.subscribe(x => console.log(x));

The code above has the nums Observable which emits a number every second up to 1000.

That’s pipe d to the windowCount operator which emits 3 values at a time and starts a new window after 3 values emitted from nums .

Then that is map ped to the win.pipe(skip(1)) Observable which skips 1 value for every 2 values emitted.

Finally, we pipe the values to mergeAll to merge all the Observables into one.

Then we should see that every third number isn’t emitted.

windowTime

The windowTime operator returns an Observable that emits windows of items with the window period set by the windowTimeSpan.

It takes up to 2 arguments, which is the windowTimeSpan, and the second is an optional argument, which is a scheduler object.

An example would the following:

import { interval } from "rxjs";  
import { windowTime, mergeAll, take, map } from "rxjs/operators";
const nums = interval(1000);  
const result = nums.pipe(  
  windowTime(1000, 5000),  
  map(win => win.pipe(take(2))),  
  mergeAll()  
);  
result.subscribe(x => console.log(x));

The code above has the nums Observable, which emits values start from 0 every second. Then the emitted values are pipe d to the windowTime operator, which starts a window every 5 seconds that’s 1 second long. Then we take 2 values from each window

This will result in 3 values being skipped in each window since values are emitted every minute by nums but we take only 2 values from every window.

windowToggle

windowToggle branches the source Observable values as nested Observable starting from emitting from openings and ending with the closingSelector emits.

It takes up to 2 arguments. The first is the openings , which is an Observable of notifications to start a new window.

The second is the closingSelector which takes the value emitted by the openings and returns an Observable which emits the next or complete signal will close the window.

We can use it as follows:

import { interval, EMPTY } from "rxjs";  
import { windowToggle, mergeAll } from "rxjs/operators";
const interval$ = interval(2000);  
const openings = interval(2000);  
const result = interval$.pipe(  
  windowToggle(openings, i => (i % 3 === 0 ? interval(500) : EMPTY)),  
  mergeAll()  
);  
result.subscribe(x => console.log(x));

We have interval$ Observable which emits a number every 2 seconds. Then we have the same Observable for openings . The emitted values for interval$ are pipe d to the windowToggle operator, which has the openings Observable as the first argument, which emits every 2 seconds. So we start a new window every 2 seconds.

Then we have the second function:

i => (i % 3 === 0 ? interval(500) : EMPTY)

to close the window when the value piped in isn’t divisible by 3. This means that we get every 3 values emitted from interval$ logged.

windowWhen

The windowWhen operator branches out the source Observable using the closingSelector function, which returns an Observable to close the window.

It takes the closingSelector function which takes the value emitted by the openings and returns an Observable which emits the next or complete signal will close the window.

For example, we can use it as follows:

import { interval } from "rxjs";  
import { mergeAll, take, windowWhen, map } from "rxjs/operators";
const interval$ = interval(2000);  
const result = interval$.pipe(  
  windowWhen(() => interval(Math.random() * 4000)),  
  map(win => win.pipe(take(2))),  
  mergeAll()  
);  
result.subscribe(x => console.log(x));

In the code above, we have the interval$ which emits a number every 2 seconds. Then the emitted value is pipe d to the windowWhen operator, which has the closingSelector ve:

() => interval(Math.random() \* 4000)

This means the window will close and reopen Math.random() * 4000 milliseconds.

We should be that some numbers are emitted faster than others.

The windowCount operator branch out the source Observable values as nested Observable with each of them emitting at most windowSize events. windowSize is the size of each window.

windowTime operator returns an Observable that emits windows of items with the window period set by the windowTimeSpan . windowTimeSpan sets the amount of time a window is open.

windowToggle branches the source Observable values as nested Observable starting from emitting from openings and ending with the closingSelector emits. The openings and closingSelector functions are Observables that control the opening and closing of the window for each Observable respectively.

The windowWhen operator branches out the source Observable using the closingSelector function, which returns an Observable to close the window.

Categories
JavaScript Rxjs

More Rxjs Transformation Operators — Scan and Window

Rxjs is a library for doing reactive programming. Creation operators are useful for generating data from various data sources to be subscribed to by Observers.

In this article, we’ll look at how to use transformation operators scan , switchMap , switchMapTo and window .

scan

The scan operator applies an accumulator function over the source Observation by combining the emitted values. Then it returns each intermediate result, with an optional seed value.

It takes up to 2 arguments. The first is the accumulator function, which is the function that combines the value that was accumulated so far with the new emitted value.

The second is an optional argument, which is the seed , or the initial accumulation value.

For example, we can use it as follows:

import { of } from "rxjs";  
import { scan } from "rxjs/operators";
const of$ = of(1, 2, 3);  
const seed = 0;  
const count = of$.pipe(scan((acc, val) => acc + val, seed));  
count.subscribe(x => console.log(x));

The code above starts with an of Observable and a seed initial value of 0. Then we pipe the values from the of$ Observable to the scan operator, which has the accumulation function, which adds the accumulated value to the newly emitted value. We also set seed to 0.

Then we subscribe to the count Observable that’s results from this.

switchMap

The switchMap operator projects each source value to an Observable which is then merged into one output Observable. Only the values from the most recently projected Observable is emitted.

It takes up to 2 arguments. The first is a project function which takes the emitted value of the source Observable as a parameter and then returns a new Observable from it.

The second is an optional resultSelector argument. It’s a function that lets us select the result from the emitted values of the new Observable.

We can use it as follows:

import { of } from "rxjs";  
import { switchMap } from "rxjs/operators";
const switched = of(1, 2, 3).pipe(switchMap(x => of(x, x * 2, x * 3)));  
switched.subscribe(x => console.log(x));

The code above takes the values emitted from the of(1, 2, 3) Observable and then pipe the result into the switchMap operator, which has a function which maps the value emitted from the of(1, 2, 3) Observable and return of(x, x * 2, x * 3) , where x is 1, 2 or 3 from the of(1, 2, 3) Observable.

This means that we get 1, 1*2 and 1*3 which are 1, 2 and 3. Then the same is done with 2, so we get 2 , 2*2 and 2*3 , which are 2, 4 and 6. Finally, we get 3 , 3*2 , and 3*3 , which 3, 6 and 9.

So we get the following output:

1  
2  
3  
2  
4  
6  
3  
6  
9

switchMapTo

The switchMapTo operator projects each source value to the same Observable. The Observable then is flatten multiple times with switchMap in the output Observable.

It takes up to 2 arguments. The first is an Observable which we replace each emitted value of the source Observable with.

The second is an optional argument, which is the resultSelector function which lets us select the value from the new Observable.

For example, we can use it as follows:

import { of, interval } from "rxjs";  
import { switchMapTo, take } from "rxjs/operators";
const switched = of(1, 2, 3).pipe(switchMapTo(interval(1000).pipe(take(3))));  
switched.subscribe(x => console.log(x));

The code above will map the emitted values of the of(1, 2, 3) Observable into the interval(1000).pipe(take(3)) which emits values from 0 to 3 spaced 1 second apart.

The result will be that we get 0, 1, 2, and 3 as the output.

window

The window operator branch out the source Observable values as a nested Observable when the windowBoundaries Observable emits.

It takes one argument, which is the windowBoundaries Observable that’s used to complete the previous window and starts a new window.

Like buffer it buffers the emitted values then emits them all at once some condition is met but emits an Observable instead of an array.

For instance, we can use it as follows:

import { interval, timer } from "rxjs";  
import { window, mergeAll, map, take } from "rxjs/operators";
const timer$ = timer(3000, 1000);  
const sec = interval(6000);  
const result = timer$.pipe(  
  window(sec),  
  map(win => win.pipe(take(2))),  
  mergeAll()  
);  
result.subscribe(x => console.log(x));

The code above has the timer(3000, 1000) Observable which emits values every second starting 3 seconds after it’s been initialized.

We also have a sec Observable that emits numbers every 6 seconds. We use that for windowing with the window operator. This means that value from the timer$ Observable will emit values, which will then be pipe d to the window operator.

This will then be piped to the map operator, which will take the first 2 values that were emitted from the window operator. Then all the results will be merged together with mergeAll .

In the end, we get 2 numbers from timer$ emitted every 6 seconds.

The scan operator applies an accumulator function over the source Observation by combining the emitted values. Then it returns each intermediate result, with an optional seed value.

switchMap projects each source value to an Observable which is then merged into one output Observable. Only the values from the most recently projected Observable are emitted.

Like switchMap,switchMapTo projects each source value an Observable. But unlike switchMap, it projects to the same Observable. The Observable then is flatten multiple times with switchMap in the output Observable.

The window operator branch out the source Observable values as a nested Observable when the windowBoundaries Observable, which is used for closing and opening the window emits.

Categories
JavaScript Rxjs

More Rxjs Transformation Operators — mergeScan and Pluck

Rxjs is a library for doing reactive programming. Creation operators are useful for generating data from various data sources to be subscribed to by Observers.

In this article, we’ll look at some transformation operators like mergeScan , pairwise , partition , and pluck operators.

mergeScan

The mergeScan operator applies an accumulator function over the source Observable where the accumulator function itself returns an Observable, where each intermediate Observable returned is merged into the output Observable.

It takes up to 3 arguments. The first is an accumulator function that’s called on each source value.

The second argument is the seed parameter, which takes the initial accumulation value.

The last argument is an optional argument which takes the maximum number of concurrent input Observables to be subscribed to concurrently.

For example, we can use it as follows:

import { of, interval } from "rxjs";  
import { mapTo, mergeScan } from "rxjs/operators";
const interval$ = interval(5000);  
const one$ = interval$.pipe(mapTo(1));  
const seed = 0;  
const count$ = one$.pipe(mergeScan((acc, one) => of(acc + one), seed));  
count$.subscribe(x => console.log(x));

The code above has the interval$ Observable which is mapped to the value 1 with the mapTo operator. Then we set the seed initial value to 0.

Then we send the emitted values of the interval$ Observable to the accumulator function via the mergeScan operator, which keeps adding 1 as the values from interval$ is emitted. Then we get the emitted number from $count when we subscribe to count$ .

pairwise

pairwise groups pairs of consecutive emissions from the source Observable together then emit them as an array 2 values.

It takes on arguments.

For example, we can use it as in the following example:

import { of } from "rxjs";  
import { pairwise } from "rxjs/operators";
const of$ = of(1, 2, 3, 4, 5, 6);  
const pair$ = of$.pipe(pairwise());  
pair$.subscribe(x => console.log(x));

The code above will group the emitted values from the of$ Observable into pairs with the pairwise() operator then emitted the values. Then we get the following output from the console.log :

[1, 2]  
[2, 3]  
[3, 4]  
[4, 5]  
[5, 6]

partition

The partition operator splits the source Observable into 2, where one has the values that satisfy a predicate and the other with values that doesn’t.

It takes up to 2 arguments. The first is the predicate , which is a function that evaluates each value emitted by the source Observable. Then if the function returns true , then the value emitted on the first return Observable in the emitted array. Otherwise, it’s emitted in the second Observable with that emitted array.

The second is the thisArg , which lets us set the value of this inside the predicate function.

For instance, we can use it as follows:

import { of } from "rxjs";  
import { partition } from "rxjs/operators";
const of$ = of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);  
const parts = of$.pipe(partition(x => x % 2 === 0));  
const [even, odd] = parts;  
odd.subscribe(x => console.log(`odd numbers: ${x}`));  
even.subscribe(x => console.log(`even numbers: ${x}`));

In the code above, we have the of$ Observable with some numbers. Then we pipe it into the partition operator, which takes a predicate function that checks whether a number emitted from the of$ Observable is even.

parts will be set to the 2 Observables returned from the partition operator. The even Observable has the even numbers emitted, and the odd Observable has odd numbers emitted.

Then we get the following output from the odd Observable:

odd numbers: 1  
odd numbers: 3  
odd numbers: 5  
odd numbers: 7  
odd numbers: 9

Then we get the following output from the even Observable:

even numbers: 2  
even numbers: 4  
even numbers: 6  
even numbers: 8  
even numbers: 10

pluck

The pluck operator map each emitted value from the source Observable to its specified nested property.

It takes one argument which is the properties object, which are the nested properties to pluck from each source value.

For example, we can use it as follows:

import { of } from "rxjs";  
import { pluck } from "rxjs/operators";
const people = [  
  { name: { firstName: "Mary" }, age: 10, gender: "female" },  
  { name: { firstName: "Joe" }, age: 11, gender: "male" },  
  { name: { firstName: "Amy" }, age: 10, gender: "female" }  
];  
const people$ = of(...people);  
const plucked = people$.pipe(pluck("name", "firstName"));  
plucked.subscribe(x => console.log(x));

The code above, has a people array where each entry has a name object. We emit the values with the of operator and then pipe the emitted values and use pluck to get the nested property firstName , which is inside name .

Then we should get:

Mary  
Joe  
Amy

as the output on the last line.

mergeScan applies an accumulator function over the source Observable where the accumulator function itself returns an Observable. Then each intermediate Observable returned is merged into the output Observable and emitted together.

pairwise groups pairs of consecutive emissions from the source Observable together then emit them as an array 2 values.

The partition operator splits the source Observable into 2, where one has the values that satisfy the condition returned in the predicate function and the other with values that doesn’t.

The pluck operator map each emitted value from the source Observable the value of the specified nested property.

Categories
JavaScript Rxjs

Some Useful Rxjs Transformation Operators

RxJS is a library for reactive programming. Creating operators are useful for generating data from various data sources to be subscribed to by Observers.

In this article, we’ll look at some RxJS transformation operators like bufferTime, bufferToggle, bufferWhen, and concatMap operators.

bufferTime

The bufferTime operator buffers the emitted data of the originating Observable for a specific time period.

It takes one argument, the bufferTimeSpan, which is the amount of time to fill each buffer array. The unit for the time span is milliseconds.

The buffer data is emitted and the buffer is reset once the amount of time specified is up.

It also takes a bufferCreationInterval argument which specifies the time span for which the buffer data builds up. The operator opens the buffer every bufferCreationInterval milliseconds and emits and resets every bufferTimeSpan milliseconds.

Another optional argument for this operator is the maxBufferSize, which is a number that specifies the maximum size of items buffered.

For example, we can use it as follows:

import { interval } from "rxjs";  
import { bufferTime } from "rxjs/operators";

const clicks = interval(2000);  
const buffered = clicks.pipe(bufferTime(1000));  
buffered.subscribe(x => console.log(x));

We should see a new number emitted every 2 seconds in the arrays that are emitted, while the rest of the emitted arrays are empty.

The bufferCreationInterval argument can be used as follows:

import { interval } from "rxjs";  
import { bufferTime } from "rxjs/operators";

const clicks = interval(2000);  
const buffered = clicks.pipe(bufferTime(1000, 1000));  
buffered.subscribe(x => console.log(x));

In the code above, the buffered data is emitted every second, and the buffer is created every second.

bufferToggle

bufferToggle buffers the source Observable values starting from the emission of openings and ending when the output of the closingSelector emits.

Values emitted by the originating Observable is buffered until the closingSelector tells us to stop emitting values from the originating Observable.

For example, if we have a button as follows:

<button>Click Me</button>

We can buffer the mouse click events on a button and the emit the MouseEvent objects that were buffered according to the closingSelector function’s specification as follows:

import { fromEvent, interval, EMPTY } from "rxjs";  
import { bufferToggle } from "rxjs/operators";

const clicks = fromEvent(document.querySelector("button"), "click");  
const openings = interval(1000);  
const buffered = clicks.pipe(  
  bufferToggle(openings, i => (i % 2 ? interval(1500) : EMPTY))  
);  
buffered.subscribe(x => console.log(x));

The closingSelector in the example above is:

(i % 2 ? interval(1500) : EMPTY)

The code will emit data from the openings Observable when it starts emitting, which is when we click on our button, ever 1.5 seconds. The emitted data is buffered into an array and then the buffering ends when i % 2 is false, and the interval(1500) Observable is returned signaling closure. It’ll continue buffer when EMPTY is emitted.

bufferWhen

The bufferWhen operator buffers data from the source Observable until the closingSelector function closes the buffer.

It takes one argument, which is the closingSelector function to specify when the buffer will be closed.

For example, we can use it as we do in the following code:

import { fromEvent, interval } from "rxjs";  
import { bufferWhen } from "rxjs/operators";

const clicks = fromEvent(document.querySelector("button"), "click");  
const buffered = clicks.pipe(  
  bufferWhen(() => interval(1000 + Math.random() * 4000))  
);  
buffered.subscribe(x => console.log(x));

What we did is get the button clicks and then emit the MouseEvent object array of the MouseEvent s from the clicks that are buffered.

Once we did, then the closingSelector specifies that we emit the buffered values every 1000 + Math.random() * 4000 milliseconds and empty the buffer and buffer the click events again.

concatMap

The concatMap operator takes each source value of the originating Observable and wait for one value of the originating Observable to emit before the next emitted value from it emits.

It takes 2 arguments. The first is a project function, which is a function that returns a new Observable with operations that we want to be applied to the values emitted from the originating Observable. The second argument is the resultSelector . It’s an optional argument, which is a function to pick the emitted values that we want to emit in the returned Observable.

For example, we can use it as follows:

import { fromEvent, interval, of } from "rxjs";  
import { concatMap, take } from "rxjs/operators";

const clicks = fromEvent(document, "click");  
const result = clicks.pipe(concatMap(ev => interval(1000).pipe(take(5))));  
result.subscribe(x => console.log(x));

The code above will receive the click events, then after 1 second emit 0, then after another second emit 1, up until it reaches 4.

It’ll do the same thing each time we click.

The bufferTime operator buffers the emitted data of the originating Observable for a specific time period, then the buffered data will be emitted as an array.

bufferToggle buffers the source Observable values starting from the emission of openings and ending when the output of the closingSelector function emits.

The bufferWhen operator buffers data from the source Observable until the closingSelector function emits its data.

Finally the concatMap operator takes each source value of the originating Observable and waits for one value of the originating Observable to emit before the next emitted value from it emits.

Categories
JavaScript Rxjs

More Rxjs Transformation Operators — Group and Map

Rxjs is a library for doing reactive programming. Creation operators are useful for generating data from various data sources to be subscribed to by Observers.

In this article, we’ll look at some transformation operators like groupBy , map , mapTo and mergeMap .

groupBy

The groupBy operator takes values emitted by the source Observable and then group them by the criteria that we set for them.

It takes 4 arguments. The first is the keySelector , which is a function that extracts the key for each item.

The second is an optional argument for the elementSelector , which is a function that extracts the return element for each item.

The 3rd argument is the durationSelector , which is an optional function that returns an Observable to determine how long each group should exist.

Finally, the last argument is the subjectSelector , which is an optional function that returns a Subject.

For example, we can use it as follows:

import { of } from "rxjs";  
import { groupBy, reduce, mergeMap } from "rxjs/operators";
const observable = of(  
  { id: 1, name: "John" },  
  { id: 2, name: "Jane" },  
  { id: 2, name: "Mary" },  
  { id: 1, name: "Joe" },  
  { id: 3, name: "Don" }  
).pipe(  
  groupBy(p => p.id),  
  mergeMap(group$ => group$.pipe(reduce((acc, cur) => [...acc, cur], [])))  
);

observable.subscribe(val => console.log(val));

In the code above, we called groupBy(p => p.id) to group the items emitted from the of Observable by id .

Then we have:

mergeMap(group$ => group$.pipe(reduce((acc, cur) => [...acc, cur], [])))

to get the grouped items together to be emitted by one Observable.

map

The map operator lets us map the values emitted by the source Observable to the other values and emits the resulting values as an Observable.

It takes up to 2 arguments. The first argument is the project function, which is required. The function takes the emitted value of the source Observable and the index of it and then returns what we want by manipulating those.

The second argument is optional. It’s the thisArg , which is used to define the this value for the project function in the first argument.

For example, we can use it as follows:

import { of } from "rxjs";  
import { map } from "rxjs/operators";
const observable = of(1, 2, 3);  
const newObservable = observable.pipe(map(val => val ** 2));  
newObservable.subscribe(x => console.log(x));

The code above takes the emitted values from observable , then pass it to the map operator via the pipe operator. In the callback, we passed into the map operator, we exponentiate the originally emitted value to the power of 2.

Then we can subscribe to an Observable that emits the new values and we get:

1  
4  
9

logged.

mapTo

The mapTo operator emits the given constant value for any source Observable’s emitted value.

It takes one argument, which is the value to emit.

For example, we can use it as follows:

import { of } from "rxjs";  
import { mapTo } from "rxjs/operators";
const observable = of(1, 2, 3);  
const newObservable = observable.pipe(mapTo("foo"));  
newObservable.subscribe(x => console.log(x));

The code above maps all the values from observable to the value 'foo' , so we get 'foo' 3 times instead of 1, 2 and 3.

mergeMap

The mergeMap operator takes the values emitted from a source Observable and then lets us combine it with the values of another Observable.

It takes up to 3 arguments. The first is a project function to project the values and return a new Observable.

The second argument is an optional argument that takes an resultSelector . We can pass in a function to select the values to emit from the result.

The third argument is the concurrency , which is an optional argument that specifies the maximum number of input Observables to be subscribed to concurrently. The default is Number.POSITIVE_INFINITY .

For example, we can use it as follows:

import { of } from "rxjs";  
import { mergeMap, map } from "rxjs/operators";
const nums = of(1, 2, 3);  
const result = nums.pipe(mergeMap(x => of(4, 5, 6).pipe(map(i => x + i))));  
result.subscribe(x => console.log(x));

The code above will get the values from the 3 values emitted from the nums Observable, then pass the values to the mergeMap ‘s callback function via the pipe operator. x will have the values from nums .

Then in the callback, we have the of(4, 5, 6) Observable, which have the values from combined from the nums Observable. i has the values from the of(4, 5, 6) Observable, so we the values from both Observables added together. We get 1 + 4, 1 + 5, 1 + 6, 2 + 4, 2 + 5, 2 + 6 and so on.

In the end, we should get the following output:

5  
6  
7  
6  
7  
8  
7  
8  
9

The groupBy operator takes values emitted by the source Observable and then group them by the criteria that we set for them. We can use it in conjunction with the mergeMap to combined the results grouped by the groupBy operator into one Observable.

The map operator lets us map the values emitted by the source Observable to the other values and emits the resulting values as an Observable.

The mapTo operator emits the given constant value for any source Observable’s emitted value.

Finally mergeMap operator takes the values emitted from a source Observable and then lets us combine it with the values of another Observable. Then we get an Observable with the values of both Observables combined together.