RxJS is a library for reactive programming. Creating operators are useful for generating data from various data sources to be subscribed to by Observers.
In this article, we’ll look at some window operators, including the windowCount
, windowTime
, windowToggle
and windowWhen
operators.
windowCount
The windowCount
operator branch out the source Observable values as a nested Observable with each of them emitting at most windowSize
events.
It takes up to 2 arguments. The first argument is the windowSize
, which the maximum number of values to be emitted by each window.
The second argument is optional. It’s the startWindowEvery
number, which defaults to 0. It’s the interval to start a new window. The interval is measured by the number of items emitted by the source Observable.
For example, we can use it as follows:
import { interval } from "rxjs";
import { windowCount, mergeAll, take, map, skip } from "rxjs/operators";
const nums = interval(1000).pipe(take(1000));
const result = nums.pipe(
windowCount(3, 3),
map(win => win.pipe(skip(1))),
mergeAll()
);
result.subscribe(x => console.log(x));
The code above has the nums
Observable which emits a number every second up to 1000.
That’s pipe
d to the windowCount
operator which emits 3 values at a time and starts a new window after 3 values emitted from nums
.
Then that is map
ped to the win.pipe(skip(1))
Observable which skips 1 value for every 2 values emitted.
Finally, we pipe the values to mergeAll
to merge all the Observables into one.
Then we should see that every third number isn’t emitted.
windowTime
The windowTime
operator returns an Observable that emits windows of items with the window period set by the windowTimeSpan
.
It takes up to 2 arguments, which is the windowTimeSpan
, and the second is an optional argument, which is a scheduler
object.
An example would the following:
import { interval } from "rxjs";
import { windowTime, mergeAll, take, map } from "rxjs/operators";
const nums = interval(1000);
const result = nums.pipe(
windowTime(1000, 5000),
map(win => win.pipe(take(2))),
mergeAll()
);
result.subscribe(x => console.log(x));
The code above has the nums
Observable, which emits values start from 0 every second. Then the emitted values are pipe
d to the windowTime
operator, which starts a window every 5 seconds that’s 1 second long. Then we take 2 values from each window
This will result in 3 values being skipped in each window since values are emitted every minute by nums
but we take
only 2 values from every window.
windowToggle
windowToggle
branches the source Observable values as nested Observable starting from emitting from openings
and ending with the closingSelector
emits.
It takes up to 2 arguments. The first is the openings
, which is an Observable of notifications to start a new window.
The second is the closingSelector
which takes the value emitted by the openings
and returns an Observable which emits the next
or complete
signal will close the window.
We can use it as follows:
import { interval, EMPTY } from "rxjs";
import { windowToggle, mergeAll } from "rxjs/operators";
const interval$ = interval(2000);
const openings = interval(2000);
const result = interval$.pipe(
windowToggle(openings, i => (i % 3 === 0 ? interval(500) : EMPTY)),
mergeAll()
);
result.subscribe(x => console.log(x));
We have interval$
Observable which emits a number every 2 seconds. Then we have the same Observable for openings
. The emitted values for interval$
are pipe
d to the windowToggle
operator, which has the openings
Observable as the first argument, which emits every 2 seconds. So we start a new window every 2 seconds.
Then we have the second function:
i => (i % 3 === 0 ? interval(500) : EMPTY)
to close the window when the value piped in isn’t divisible by 3. This means that we get every 3 values emitted from interval$
logged.
windowWhen
The windowWhen
operator branches out the source Observable using the closingSelector
function, which returns an Observable to close the window.
It takes the closingSelector
function which takes the value emitted by the openings
and returns an Observable which emits the next
or complete
signal will close the window.
For example, we can use it as follows:
import { interval } from "rxjs";
import { mergeAll, take, windowWhen, map } from "rxjs/operators";
const interval$ = interval(2000);
const result = interval$.pipe(
windowWhen(() => interval(Math.random() * 4000)),
map(win => win.pipe(take(2))),
mergeAll()
);
result.subscribe(x => console.log(x));
In the code above, we have the interval$
which emits a number every 2 seconds. Then the emitted value is pipe
d to the windowWhen
operator, which has the closingSelector
ve:
() => interval(Math.random() \* 4000)
This means the window will close and reopen Math.random() * 4000
milliseconds.
We should be that some numbers are emitted faster than others.
The windowCount
operator branch out the source Observable values as nested Observable with each of them emitting at most windowSize
events. windowSize
is the size of each window.
windowTime
operator returns an Observable that emits windows of items with the window period set by the windowTimeSpan
. windowTimeSpan
sets the amount of time a window is open.
windowToggle
branches the source Observable values as nested Observable starting from emitting from openings
and ending with the closingSelector
emits. The openings
and closingSelector
functions are Observables that control the opening and closing of the window for each Observable respectively.
The windowWhen
operator branches out the source Observable using the closingSelector
function, which returns an Observable to close the window.