Categories
JavaScript Rxjs

More Rxjs Transformation Operators — Window

Spread the love

RxJS is a library for reactive programming. Creating operators are useful for generating data from various data sources to be subscribed to by Observers.

In this article, we’ll look at some window operators, including the windowCount, windowTime, windowToggle and windowWhen operators.

windowCount

The windowCount operator branch out the source Observable values as a nested Observable with each of them emitting at most windowSize events.

It takes up to 2 arguments. The first argument is the windowSize, which the maximum number of values to be emitted by each window.

The second argument is optional. It’s the startWindowEvery number, which defaults to 0. It’s the interval to start a new window. The interval is measured by the number of items emitted by the source Observable.

For example, we can use it as follows:

import { interval } from "rxjs";  
import { windowCount, mergeAll, take, map, skip } from "rxjs/operators";
const nums = interval(1000).pipe(take(1000));  
const result = nums.pipe(  
  windowCount(3, 3),  
  map(win => win.pipe(skip(1))),  
  mergeAll()  
);  
result.subscribe(x => console.log(x));

The code above has the nums Observable which emits a number every second up to 1000.

That’s pipe d to the windowCount operator which emits 3 values at a time and starts a new window after 3 values emitted from nums .

Then that is map ped to the win.pipe(skip(1)) Observable which skips 1 value for every 2 values emitted.

Finally, we pipe the values to mergeAll to merge all the Observables into one.

Then we should see that every third number isn’t emitted.

windowTime

The windowTime operator returns an Observable that emits windows of items with the window period set by the windowTimeSpan.

It takes up to 2 arguments, which is the windowTimeSpan, and the second is an optional argument, which is a scheduler object.

An example would the following:

import { interval } from "rxjs";  
import { windowTime, mergeAll, take, map } from "rxjs/operators";
const nums = interval(1000);  
const result = nums.pipe(  
  windowTime(1000, 5000),  
  map(win => win.pipe(take(2))),  
  mergeAll()  
);  
result.subscribe(x => console.log(x));

The code above has the nums Observable, which emits values start from 0 every second. Then the emitted values are pipe d to the windowTime operator, which starts a window every 5 seconds that’s 1 second long. Then we take 2 values from each window

This will result in 3 values being skipped in each window since values are emitted every minute by nums but we take only 2 values from every window.

windowToggle

windowToggle branches the source Observable values as nested Observable starting from emitting from openings and ending with the closingSelector emits.

It takes up to 2 arguments. The first is the openings , which is an Observable of notifications to start a new window.

The second is the closingSelector which takes the value emitted by the openings and returns an Observable which emits the next or complete signal will close the window.

We can use it as follows:

import { interval, EMPTY } from "rxjs";  
import { windowToggle, mergeAll } from "rxjs/operators";
const interval$ = interval(2000);  
const openings = interval(2000);  
const result = interval$.pipe(  
  windowToggle(openings, i => (i % 3 === 0 ? interval(500) : EMPTY)),  
  mergeAll()  
);  
result.subscribe(x => console.log(x));

We have interval$ Observable which emits a number every 2 seconds. Then we have the same Observable for openings . The emitted values for interval$ are pipe d to the windowToggle operator, which has the openings Observable as the first argument, which emits every 2 seconds. So we start a new window every 2 seconds.

Then we have the second function:

i => (i % 3 === 0 ? interval(500) : EMPTY)

to close the window when the value piped in isn’t divisible by 3. This means that we get every 3 values emitted from interval$ logged.

windowWhen

The windowWhen operator branches out the source Observable using the closingSelector function, which returns an Observable to close the window.

It takes the closingSelector function which takes the value emitted by the openings and returns an Observable which emits the next or complete signal will close the window.

For example, we can use it as follows:

import { interval } from "rxjs";  
import { mergeAll, take, windowWhen, map } from "rxjs/operators";
const interval$ = interval(2000);  
const result = interval$.pipe(  
  windowWhen(() => interval(Math.random() * 4000)),  
  map(win => win.pipe(take(2))),  
  mergeAll()  
);  
result.subscribe(x => console.log(x));

In the code above, we have the interval$ which emits a number every 2 seconds. Then the emitted value is pipe d to the windowWhen operator, which has the closingSelector ve:

() => interval(Math.random() \* 4000)

This means the window will close and reopen Math.random() * 4000 milliseconds.

We should be that some numbers are emitted faster than others.

The windowCount operator branch out the source Observable values as nested Observable with each of them emitting at most windowSize events. windowSize is the size of each window.

windowTime operator returns an Observable that emits windows of items with the window period set by the windowTimeSpan . windowTimeSpan sets the amount of time a window is open.

windowToggle branches the source Observable values as nested Observable starting from emitting from openings and ending with the closingSelector emits. The openings and closingSelector functions are Observables that control the opening and closing of the window for each Observable respectively.

The windowWhen operator branches out the source Observable using the closingSelector function, which returns an Observable to close the window.

Leave a Reply

Your email address will not be published.

If you like the content of this blog, subscribe to my email list to get exclusive articles not available to anyone else.