Rxjs is a library for doing reactive programming. Creation operators are useful for generating data from various data sources to be subscribed to by Observers.
In this article, we’ll look at the throttle and join operators, including the throttle
, throttleTime
, combineLatest
and concatAll
.
Filtering Operators
Throttle
The throttle
operator emits values from the source Observable, then ignores subsequently emitted values from the source for a duration determined by another Observable, then repeats the process.
It takes up to 2 arguments. The first is the durationSelector
, which is a function that takes a value from the source Observable then returns an Observable or Promise that computes the throttling duration for each source value.
The second argument is an optional config
object to define the leading
and trailing
behavior. Default value is defaultThrottleConfig
.
It returns an Observable that performs the throttle operation from the source.
For example, we can use it as follows:
import { interval } from "rxjs";
import { throttle } from "rxjs/operators";
const interval$ = interval(1000);
const result = interval$.pipe(throttle(ev => interval(5000)));
result.subscribe(x => console.log(x));
The code above has an interval$
Observable that emits a number every second. The values from it are pipe
d into the throttle
operator, which takes a function that returns the interval(5000)
Observable, which emits a number every 5 seconds.
Therefore, the throttle
operator will emit values from interval$
every 5 seconds since our throttle
callback function returned interval(5000)
.
Then we should get every 5th number logged in the console.log
.
throttleTime
throttleTime
emits a value from the source Observable then ignores subsequently emitted values for duration
milliseconds then repeats the process.
It takes up to 3 arguments. The first is the duration
, which is the time to wait before emitting another value after emitting the last value. It’s measured in milliseconds or the time unit of the optional scheduler
.
The second argument is the optionalscheduler
, which defaults to async
. It’s used for setting the timing of the emission.
The last argument is the config
, which is optional. It defaults to defaultThrottleConfig
. We can pass in an object to define the leading
and trailing
behavior. Default value is{ leading: true, trailing: false }
.
For example, we can use it as follows:
import { interval } from "rxjs";
import { throttleTime } from "rxjs/operators";
const interval$ = interval(1000);
const result = interval$.pipe(throttleTime(5000));
result.subscribe(x => console.log(x));
The above works like our previous throttle
example, except that we change throttle(ev => interval(5000))
to throttleTime(5000)
, which do the same thing.
We emit numbers from $interval
every 5 seconds.
Then we get the same numbers logged as the example above.
Join Operators
combineAll
The combineAll
operator flattens Observable of Observables by applying combineLatest
when they complete.
It takes one optional argument, which is a project
function to map each value emitted to something we want.
Once the outer Observable completes then it subscribes to all collected Observables and combines their values as follows:
- Every time an inner Observable emits, the output Observable emits
- When the returned Observable emits and a
project
function is specified thenproject
is called as the values arrive and it’ll manipulate each value with theproject
function - If there’s no
project
function then the most recent values is emitted by the returned Observable
For example, we can use it as follows:
import { of } from "rxjs";
import { map, combineAll } from "rxjs/operators";
const of$ = of(1, 2, 3);
const higherOrderObservable = of$.pipe(map(val => of("a", "b", "c")));
const result = higherOrderObservable.pipe(combineAll());
result.subscribe(x => console.log(x));
The code above maps the of$
Observable to child Observables. It returns of(“a”, “b”, “c”)
for each value emitted by of$
.
Then we use combineAll
to combine the latest values from each of the child Observables in higherOrderObservable
into an array. Then each of these arrays is emitted by the result
Observable.
Then we get:
["c", "c", "a"]
["c", "c", "b"]
["c", "c", "c"]
as the console.log
output. The first 2 Observables completed before the last one, so we get 'c'
from them. Then the as the third one emits, the last value in the array is added, so we get 'a'
, 'b'
and 'c'
respectively from them.
concatAll
The concatAll
operator converts higher-order Observables by concatenating the inner Observables in order,
It takes no parameters and returns an Observable that emits the values from all inner Observables concatenated.
For example, we can use it as follows:
import { of } from "rxjs";
import { map, concatAll } from "rxjs/operators";
const of$ = of(1, 2, 3);
const higherOrderObservable = of$.pipe(map(val => of("a", "b", "c")));
const result = higherOrderObservable.pipe(concatAll());
result.subscribe(x => console.log(x));
The of$
Observable’s emitted values is pipe
d and each value from of$
is mapped to the of(“a”, “b”, “c”)
Observable.
Then we pipe
the results from the inner Observables in higherOrderObservable
which are 3 of(“a”, “b”, “c”)
Observables mapped from of$
have their emitted values combined by concatAll
.
In the end, we get:
a
b
c
a
b
c
a
b
c
The throttle
operator emits values from the source Observable, then ignores subsequently emitted values from the source for a duration determined by another Observable and repeats.
throttleTime
emits a value from the source Observable then ignores subsequently emitted values for a given amount of time and repeats.
combineAll
flattens Observable of Observables by combining the latest values when they complete.
concatAll
operator combines the emitted values of inner Observables in order.