Categories
JavaScript Rxjs

More Useful Rxjs Transformation Operators

RxJS is a library for reactive programming. Creation operators are useful for generating data from various data sources to be subscribed to by Observers.

In this article, we’ll look at some RxJS transformation operators like concatMapTo, expand, exhaust, and exhaustMap operators.

concatMapTo

The concatMapTo operator takes the source value of the Observable and maps each value to the same inner Observable.

It takes up to 2 arguments. The first is the innerObservable, which is the Observable to map each value from the originating Observable to.

The second argument is a resultSelector function to let us pick the results.

Each originating Observable’s emitted value is mapped to the given innerObservable regardless of the source value. Then flatten those resulting Observables into one Observable. which is the returned Observable.

Each new innerObservable’s emitted are concatenated together with the other ones’ values.

If the source values arrive endlessly and faster than the corresponding inner Observables can complete then it’ll result in memory issues since there’ll be a large amount of data amassed waiting to be subscribed to.

For example, we can use it as follows:

import { fromEvent, interval } from "rxjs";  
import { take, concatMapTo } from "rxjs/operators";
const clicks = fromEvent(document, "click");  
const result = clicks.pipe(concatMapTo(interval(1000).pipe(take(5))));  
result.subscribe(x => console.log(x));

The code above will get the click events from the document and then emit 0 after 1 second, then wait for another second and emit, up to 4.

exhaust

The exhaust operator converts a higher-level Observable, which are Observables that emits Observables, into the first-order Observable by emitted inner Observables while previous inner Observable hasn’t been completed.

The result would be that the values from all the Observables are flattened and emitted by one Observable, which is the one returned by this operator.

For example, we can write:

import { interval, of } from "rxjs";  
import { exhaust, map, take } from "rxjs/operators";
const observable = of(1, 2, 3);  
const higherOrder = observable.pipe(map(ev => interval(1000).pipe(take(5))));  
const result = higherOrder.pipe(exhaust());  
result.subscribe(x => console.log(x));

In the code above, we have an observable Observable that emits 1, 2, and 3. The values emitted by observable are mapped to inner Observables using the map operator into interval(1000).pipe(take(5) .

Since the inner Observables that aren’t completed are dropped by the exhaust operator, we’ll get that interval(1000).pipe(take(5) is only emitted once since only the first instance of it is started. Then we get that a number is emitted every second, starting with 0 and stops at 4.

exhaustMap

The exhaustMap operator also takes the first inner Observable that are mapped from the originating Observable like the exhaust operator. The difference is that it takes a function that lets us map the values of the origination Observable’s emitted values into another Observable, but only taking the first emitted Observable by this operator.

For example, if we have the following code:

import { interval, of } from "rxjs";  
import { take, exhaustMap } from "rxjs/operators";
const observable = of(1, 2, 3);  
const result = observable.pipe(exhaustMap(ev => interval(1000).pipe(take(5))));  
result.subscribe(x => console.log(x));

We get the same result as exhaust, but we don’t have to pipe and map the emitted values of observable into a new Observable. Instead, we combined everything together with the exhaustMap operator.

expand

The expand operator recursively projects each source value to an Observable which is then merged into the output Observable.

It takes 3 arguments, which is the project function, which applies to the items emitted by the source Observable or the output Observable and returns a new Observable.

The second argument is the concurrency, which is an optional argument for the maximum number of input Observables being subscribed to concurrently. It defaults to Number.POSITIVE_INFINITY .

The last argument is the optional scheduler object, which is the scheduler we can use to time the emission of values.

One simple example usage of this operator would be as follows:

import { of } from "rxjs";  
import { expand, delay, take } from "rxjs/operators";
const powersOfTwo = of(1, 2, 3).pipe(  
  expand(x => of(x).pipe(delay(1000))),  
  take(10)  
);  
powersOfTwo.subscribe(x => console.log(x));

In the code above, we have the of(1, 2, 3) Observable, which have the emitted values pipe d to the expand(x => of(x).pipe(delay(1000))) Observable. Then the first 10 values are taken from the Observable returned by expand, which takes the of(1, 2, 3) Observable’s emitted values and emit them repeatedly until we have 10 values emitted and each group is emitted after waiting 1 second.

This is because we specified of(x) which takes the of(1, 2, 3) values and then get then emit them without making any changes. pipe(delay(1000)) delays the emission of each group of values 1, 2 and 3 one second apart.

This should result in the following output:

1  
2  
3  
1  
2  
3  
1  
2  
3  
1

Conclusion

concatMapTo operator takes the source value of the Observable and maps each value to the same inner Observable.

The exhaust operator converts an Observable which emits Observables and then emit the values from the first inner Observable and discard the ones that haven’t finished emitting.

The exhaustMap operator also takes the first inner Observable then map them into inner Observables, which then have the values emitted.

Finally, the expand operator recursively projects each source value to an Observable which is then merged into the output Observable and emitted.

Categories
JavaScript Rxjs

More Useful Rxjs Creation Operators

Rxjs is a library for doing reactive programming. Creation operators are useful for generating data from various data sources to be subscribed to by Observers.

In this article, we’ll look at more creation operators from Rxjs, like functions that create Observables from event handlers, functions that generate Observables that emit numbers, and functions that let us conditionally subscribe to Observables.

fromEvent

The fromEvent function creates an Observable that emits event objects of the type that we specified as it happens to the event target.

It takes up to 4 arguments. The first argument is the event target. which is required. It can be a DOM event target, Node.js event emitter, jQuery like event target, NodeList, or HTMLCollection to attach event handlers to.

The second argument is the event name that’s being emitted by the event target.

The 3rd argument is an optional argument with some options.

The last argument is an optional result selector function.

Every time the Observable is subscribed to, the event handler function will be registered to the event target on the given event type.

When the event fires, the event object will emitted by the Observable.

The event targets are checked by duck typing. We can safely use fromEvent on the object on the object if it exposes the following methods:

  • DOM event target if it has the addEventLister and removeEventListener methods
  • Node.js event emitter if it has the addListener and removeListener methods
  • jQuery style objects if it has the on and off methods
  • DOM NodeLists or HtmlCollection if they have a list of DOM nodes returned by methods like document.querySelectorAll or the childNodes property of a DOM node.

We can use fromEvent as follows:

import { fromEvent } from "rxjs";
const clicks = fromEvent(window, "click");  
clicks.subscribe(x => console.log(x));

The code above watches for clicks on the document object. We should see MouseEvent objects logged when we click anywhere on the browser tab.

fromEventPattern

This is similar to fromEvent , except that we pass in handler functions for adding event listeners and removing event listeners.

It takes 3 arguments. The first 2 are the add and remove handlers respectively. The remove handler is optional. The last argument is an optional result selector function for manipulating emitted values.

For example, we can use it to detect the clicks on a div with ID app as follows:

import { fromEventPattern } from "rxjs";
const app = document.querySelector("#app");

function addClickHandler(handler) {  
  app.addEventListener("click", handler);  
}

function removeClickHandler(handler) {  
  app.removeEventListener("click", handler);  
}

const clicks = fromEventPattern(addClickHandler, removeClickHandler);  
clicks.subscribe(x => console.log(x));

We should see MouseEvent objects logged when we click anywhere on a div with the ID app .

generate

The generate function creates an Observable with a stream of values by passing in the initial state, a function with the condition for the ending the emitting of values, a function for iterating through the values, result selector function for selecting the emitted results, and a scheduler object for changing the timing for emitting the values.

Only the initial state is required. For example, we can create an Observable that emits the values 0 to 9 by writing:

import { generate } from "rxjs";
const generated = generate(0, x => x < 10, x => x + 1);  
generated.subscribe(  
  value => console.log(value),  
  err => {}  
);

The first argument of the generate function call has the first value to emit or the initial state. The second has the ending condition, which is less than 10. The last argument has the function that indicates how to move on and emit the next item and move towards the ending condition in the second argument.

interval

interval creates an Observable that emits sequential numbers in a specified interval of time.

It takes 2 optional arguments. The first is the number of milliseconds or the time unit determined by the scheduler’s clock. The default is 0.

The second argument is the scheduler to use, which defaults to async .

For example:

import { interval } from "rxjs";
const numbers = interval(1000);

creates an Observable that emits a new value every second.

of

This creates an Observable out of its arguments.

It takes an infinite number of arguments.

For example, we can use it as follows:

import { of } from "rxjs";
of(1, 2, 3).subscribe(  
  val => console.log(val),  
  err => console.log(err),  
  () => console.log("end")  
);

range

We can use range to create an Observable that emits a sequence of numbers within the specified range.

It takes 3 optional arguments, which are the start, which defaults to 0. The number of integers to generate, which defaults to undefined and the scheduler to use, which defaults to undefined .

For example, we can use it to create an Observable which emits number 1 to 20 as follows:

import { range } from "rxjs";
const numbers = range(1, 20);  
numbers.subscribe(x => console.log(x));

throwError

Creates an Observable that only emits an error notification.

It takes up to 2 arguments. The first is the error to emit. The second is an optional scheduler argument to let us choose the scheduler. It defaults to undefined .

We can use it as follows:

import { throwError } from "rxjs";
const numbers = throwError("error");  
numbers.subscribe(() => {}, err => console.log(err));

We subscribed to the error notification in the second argument.

timer

timer creates an Observable that starts emitting values after a specified time and emit ever-increasing number after a specified interval thereafter.

The first argument is the time that the Observable starts emitting, which defaults to 0. The number is in milliseconds.

The second argument is the period of emitting values which defaults to undefined . The number is in milliseconds.

The last argument is the scheduler to use, which defaults to undefined .

For example, we can create an Observable to emit values after 2 seconds, then every second thereafter by writing:

import { timer } from "rxjs";
const numbers = timer(2000, 1000);  
numbers.subscribe(x => console.log(x));

iif

Let us create an Observable that decides which Observable will be subscribed to at subscribe time.

It takes up to 3 arguments. The first is the condition for which Observable to be chosen. The 2nd and 3rd arguments are the Observable that are chosen when the condition is true and false respectively.

For example, we can use it as follows:

import { iif, of } from "rxjs";
let wantFoo;  
const fooBar = iif(() => wantFoo, of("foo"), of("bar"));
wantFoo = true;  
fooBar.subscribe(val => console.log(val));
wantFoo = false;
fooBar.subscribe(val => console.log(val));

In the code above, if wantFoo is true , when of('foo') is subscribed. Otherwise, of('bar') is subscribed.

We can create Observables that handle DOM or Node.js events with the fromEvent creation operator.

of operator lets us create Observables from any list of objects.

throw only throws an error and does nothing else.

generate , interval , and range let us create Observables that emit number ranges.

timer lets us create timed Observables and iif lets us create conditional Observables.

Categories
JavaScript Rxjs

Rxjs Filtering Operators — Audit and Debounce

RxJS is a library for doing reactive programming. Creating operators are useful for generating data from various sources to be subscribed to by Observers.

In this article, we’ll look at some filtering operators which let us filter out emitted values in a way that we specify, including the audit, auditTime, debounce, and debounceTime operators.

audit

The audit operator lets us filter out emitted values from the source Observable for a period of time, then emits the most recent values. Then the process is repeated.

It takes one argument, which is the durationSelector function which receives a value from the source Observable to use to compute the duration to which to ignore emitted values.

The durationSelector function can return s promise or an Observable.

It returns the Observable that does the rate-limiting.

It’s similar to throttle, but it returns the last value from the silenced duration instead of the first.

When the duration Observable emits a value or completes, the timer is disabled. Then the most recent source value is emitted from the output Observable.

For example, we can use it as follows:

import { interval } from "rxjs";  
import { audit } from "rxjs/operators";

const interval$ = interval(1000);  
const result = interval$.pipe(audit(ev => interval(4000)));  
result.subscribe(x => console.log(x));

The code above takes the interval$ Observable then pipes the value into the audit operator, which has the function which returns interval(4000). This means the value from the returned Observable will emit values every 4 seconds.

The effect produced would be that every 3 values are skipped.

auditTime

The auditTime operator ignores the source Observable’s values for duration milliseconds then emit the most recent value from the source Observable and repeats the process.

It takes up to 2 arguments. One is the duration , which is the time to wait before emitting the most recent value. The duration is measured in milliseconds or the time unit determined by the optional scheduler argument.

The second is an optional scheduler to let us time the values emitted.

It returns the Observable that does the rate-limiting.

For example, we can use it as follows:

import { interval } from "rxjs";  
import { auditTime } from "rxjs/operators";

const interval$ = interval(1000);  
const result = interval$.pipe(auditTime(5000));  
result.subscribe(x => console.log(x));

The code above takes the emitted values from the interval$ Observable, which emits numbers every second, then pipe s it to the auditTime(5000) operator, which emits the latest value from the interval$ Observable every 5 seconds.

The result will be that we get the latest value emitted from interval$ every 5 seconds.

debounce

debounce emits a value from the source Observable after a particular time span has passed without another source emission.

It takes one argument, which is a durationSelector function that receives a value from the source Observable for computing the timeout duration for each source value.

The durationSelector function can return a promise or an Observable.

It returns an Observable that emits values according to the delay specified by the return value of the durationSelector function. The returned Observable may drop some values if emissions from the source happen too frequently.

For example, we can use it as follows. Given that we have the following input element:

<input type="text" />

We can write the following JavaScript code to delay the emission of the input value for 2 seconds:

import { fromEvent, interval } from "rxjs";  
import { debounce } from "rxjs/operators";

const clicks = fromEvent(document.querySelector("input"), "input");  
const result = clicks.pipe(debounce(() => interval(2000)));  
result.subscribe(x => console.log(x));

The input events are received by the fromEvent function, then it’s pipe d to our durationSelector function which is () => interval(2000) via the debounce operator.

debounceTime

Like the debounce operator, the debounceTime operator emits values from the source Observable after a delay has passed without another emission from the source Observable.

The difference is in the argument it accepts. It takes up to 2 arguments. The first is the dueTime number, which is the duration in milliseconds or the time unit specified by the optional scheduler argument for the time to delay emission of values.

The second argument is the optionalscheduler argument, which defaults to async . It lets us time the emissions as we want to.

It returns an Observable that delays the emission of the source Observable by the specified dueTime and may drop some values if emissions from the source happen too frequently.

For example, we can use it as follows. Given that we have the following input element:

<input type="text" />

We can use the debounceTime operator to delay the emission of input event objects as follows:

import { fromEvent } from "rxjs";  
import { debounceTime } from "rxjs/operators";

const clicks = fromEvent(document.querySelector("input"), "input");  
const result = clicks.pipe(debounceTime(2000));  
result.subscribe(x => console.log(x));

The code above will delay the emission of input events objects by 2 seconds and drops whatever is emitted less than 2 seconds.

audit and auditTime lets us filter out emitted values from the source Observable for a period of time, then emits the most recent values. Then the process is repeated.

debounce and debounceTime operators emit values from the source Observable after a delay has passed without another emission from the source Observable.

Categories
JavaScript Rxjs

More Rxjs Transformation Operators — Window

RxJS is a library for reactive programming. Creating operators are useful for generating data from various data sources to be subscribed to by Observers.

In this article, we’ll look at some window operators, including the windowCount, windowTime, windowToggle and windowWhen operators.

windowCount

The windowCount operator branch out the source Observable values as a nested Observable with each of them emitting at most windowSize events.

It takes up to 2 arguments. The first argument is the windowSize, which the maximum number of values to be emitted by each window.

The second argument is optional. It’s the startWindowEvery number, which defaults to 0. It’s the interval to start a new window. The interval is measured by the number of items emitted by the source Observable.

For example, we can use it as follows:

import { interval } from "rxjs";  
import { windowCount, mergeAll, take, map, skip } from "rxjs/operators";
const nums = interval(1000).pipe(take(1000));  
const result = nums.pipe(  
  windowCount(3, 3),  
  map(win => win.pipe(skip(1))),  
  mergeAll()  
);  
result.subscribe(x => console.log(x));

The code above has the nums Observable which emits a number every second up to 1000.

That’s pipe d to the windowCount operator which emits 3 values at a time and starts a new window after 3 values emitted from nums .

Then that is map ped to the win.pipe(skip(1)) Observable which skips 1 value for every 2 values emitted.

Finally, we pipe the values to mergeAll to merge all the Observables into one.

Then we should see that every third number isn’t emitted.

windowTime

The windowTime operator returns an Observable that emits windows of items with the window period set by the windowTimeSpan.

It takes up to 2 arguments, which is the windowTimeSpan, and the second is an optional argument, which is a scheduler object.

An example would the following:

import { interval } from "rxjs";  
import { windowTime, mergeAll, take, map } from "rxjs/operators";
const nums = interval(1000);  
const result = nums.pipe(  
  windowTime(1000, 5000),  
  map(win => win.pipe(take(2))),  
  mergeAll()  
);  
result.subscribe(x => console.log(x));

The code above has the nums Observable, which emits values start from 0 every second. Then the emitted values are pipe d to the windowTime operator, which starts a window every 5 seconds that’s 1 second long. Then we take 2 values from each window

This will result in 3 values being skipped in each window since values are emitted every minute by nums but we take only 2 values from every window.

windowToggle

windowToggle branches the source Observable values as nested Observable starting from emitting from openings and ending with the closingSelector emits.

It takes up to 2 arguments. The first is the openings , which is an Observable of notifications to start a new window.

The second is the closingSelector which takes the value emitted by the openings and returns an Observable which emits the next or complete signal will close the window.

We can use it as follows:

import { interval, EMPTY } from "rxjs";  
import { windowToggle, mergeAll } from "rxjs/operators";
const interval$ = interval(2000);  
const openings = interval(2000);  
const result = interval$.pipe(  
  windowToggle(openings, i => (i % 3 === 0 ? interval(500) : EMPTY)),  
  mergeAll()  
);  
result.subscribe(x => console.log(x));

We have interval$ Observable which emits a number every 2 seconds. Then we have the same Observable for openings . The emitted values for interval$ are pipe d to the windowToggle operator, which has the openings Observable as the first argument, which emits every 2 seconds. So we start a new window every 2 seconds.

Then we have the second function:

i => (i % 3 === 0 ? interval(500) : EMPTY)

to close the window when the value piped in isn’t divisible by 3. This means that we get every 3 values emitted from interval$ logged.

windowWhen

The windowWhen operator branches out the source Observable using the closingSelector function, which returns an Observable to close the window.

It takes the closingSelector function which takes the value emitted by the openings and returns an Observable which emits the next or complete signal will close the window.

For example, we can use it as follows:

import { interval } from "rxjs";  
import { mergeAll, take, windowWhen, map } from "rxjs/operators";
const interval$ = interval(2000);  
const result = interval$.pipe(  
  windowWhen(() => interval(Math.random() * 4000)),  
  map(win => win.pipe(take(2))),  
  mergeAll()  
);  
result.subscribe(x => console.log(x));

In the code above, we have the interval$ which emits a number every 2 seconds. Then the emitted value is pipe d to the windowWhen operator, which has the closingSelector ve:

() => interval(Math.random() \* 4000)

This means the window will close and reopen Math.random() * 4000 milliseconds.

We should be that some numbers are emitted faster than others.

The windowCount operator branch out the source Observable values as nested Observable with each of them emitting at most windowSize events. windowSize is the size of each window.

windowTime operator returns an Observable that emits windows of items with the window period set by the windowTimeSpan . windowTimeSpan sets the amount of time a window is open.

windowToggle branches the source Observable values as nested Observable starting from emitting from openings and ending with the closingSelector emits. The openings and closingSelector functions are Observables that control the opening and closing of the window for each Observable respectively.

The windowWhen operator branches out the source Observable using the closingSelector function, which returns an Observable to close the window.

Categories
JavaScript Rxjs

More Rxjs Transformation Operators — Scan and Window

Rxjs is a library for doing reactive programming. Creation operators are useful for generating data from various data sources to be subscribed to by Observers.

In this article, we’ll look at how to use transformation operators scan , switchMap , switchMapTo and window .

scan

The scan operator applies an accumulator function over the source Observation by combining the emitted values. Then it returns each intermediate result, with an optional seed value.

It takes up to 2 arguments. The first is the accumulator function, which is the function that combines the value that was accumulated so far with the new emitted value.

The second is an optional argument, which is the seed , or the initial accumulation value.

For example, we can use it as follows:

import { of } from "rxjs";  
import { scan } from "rxjs/operators";
const of$ = of(1, 2, 3);  
const seed = 0;  
const count = of$.pipe(scan((acc, val) => acc + val, seed));  
count.subscribe(x => console.log(x));

The code above starts with an of Observable and a seed initial value of 0. Then we pipe the values from the of$ Observable to the scan operator, which has the accumulation function, which adds the accumulated value to the newly emitted value. We also set seed to 0.

Then we subscribe to the count Observable that’s results from this.

switchMap

The switchMap operator projects each source value to an Observable which is then merged into one output Observable. Only the values from the most recently projected Observable is emitted.

It takes up to 2 arguments. The first is a project function which takes the emitted value of the source Observable as a parameter and then returns a new Observable from it.

The second is an optional resultSelector argument. It’s a function that lets us select the result from the emitted values of the new Observable.

We can use it as follows:

import { of } from "rxjs";  
import { switchMap } from "rxjs/operators";
const switched = of(1, 2, 3).pipe(switchMap(x => of(x, x * 2, x * 3)));  
switched.subscribe(x => console.log(x));

The code above takes the values emitted from the of(1, 2, 3) Observable and then pipe the result into the switchMap operator, which has a function which maps the value emitted from the of(1, 2, 3) Observable and return of(x, x * 2, x * 3) , where x is 1, 2 or 3 from the of(1, 2, 3) Observable.

This means that we get 1, 1*2 and 1*3 which are 1, 2 and 3. Then the same is done with 2, so we get 2 , 2*2 and 2*3 , which are 2, 4 and 6. Finally, we get 3 , 3*2 , and 3*3 , which 3, 6 and 9.

So we get the following output:

1  
2  
3  
2  
4  
6  
3  
6  
9

switchMapTo

The switchMapTo operator projects each source value to the same Observable. The Observable then is flatten multiple times with switchMap in the output Observable.

It takes up to 2 arguments. The first is an Observable which we replace each emitted value of the source Observable with.

The second is an optional argument, which is the resultSelector function which lets us select the value from the new Observable.

For example, we can use it as follows:

import { of, interval } from "rxjs";  
import { switchMapTo, take } from "rxjs/operators";
const switched = of(1, 2, 3).pipe(switchMapTo(interval(1000).pipe(take(3))));  
switched.subscribe(x => console.log(x));

The code above will map the emitted values of the of(1, 2, 3) Observable into the interval(1000).pipe(take(3)) which emits values from 0 to 3 spaced 1 second apart.

The result will be that we get 0, 1, 2, and 3 as the output.

window

The window operator branch out the source Observable values as a nested Observable when the windowBoundaries Observable emits.

It takes one argument, which is the windowBoundaries Observable that’s used to complete the previous window and starts a new window.

Like buffer it buffers the emitted values then emits them all at once some condition is met but emits an Observable instead of an array.

For instance, we can use it as follows:

import { interval, timer } from "rxjs";  
import { window, mergeAll, map, take } from "rxjs/operators";
const timer$ = timer(3000, 1000);  
const sec = interval(6000);  
const result = timer$.pipe(  
  window(sec),  
  map(win => win.pipe(take(2))),  
  mergeAll()  
);  
result.subscribe(x => console.log(x));

The code above has the timer(3000, 1000) Observable which emits values every second starting 3 seconds after it’s been initialized.

We also have a sec Observable that emits numbers every 6 seconds. We use that for windowing with the window operator. This means that value from the timer$ Observable will emit values, which will then be pipe d to the window operator.

This will then be piped to the map operator, which will take the first 2 values that were emitted from the window operator. Then all the results will be merged together with mergeAll .

In the end, we get 2 numbers from timer$ emitted every 6 seconds.

The scan operator applies an accumulator function over the source Observation by combining the emitted values. Then it returns each intermediate result, with an optional seed value.

switchMap projects each source value to an Observable which is then merged into one output Observable. Only the values from the most recently projected Observable are emitted.

Like switchMap,switchMapTo projects each source value an Observable. But unlike switchMap, it projects to the same Observable. The Observable then is flatten multiple times with switchMap in the output Observable.

The window operator branch out the source Observable values as a nested Observable when the windowBoundaries Observable, which is used for closing and opening the window emits.