Categories
Rxjs

Basic Usage of Rxjs Operators

With operators, we can do more with Rxjs observables. They’re useful for doing complex operations with asynchronous code.

Operators are functions. There’re 2 types of operators:

  • Pipeable operators — they can be piped to Observables using the pipe method available in Observables. They return a new Observable, so multiple pipeable operators can be chained together
  • Creation operators — these are standalone functions to create a new Observable. For example, of(1,2,3) will emit 1, 2, and 3.

Using Operators

We can import operators from Rxjs to use to manipulate our observable results.

For example, we can map values from Observables to another using the map function:

import { of } from "rxjs";
import { map } from "rxjs/operators";

map(x => x * 2)(of(1, 2, 3)).subscribe(val => console.log(val));

We created an Observable that emits 1, 2 and 3 with the of function and then called map with a callback to let us map the values, which returns a function where we can pass in the Observable that we created and returns a new Observable.

map(x => x * 2)(of(1, 2, 3))

returns a new Observable that we can subscribe to. Then we get the values 2, 4, and 6 in the subscribe ‘s callback function.

Piping

We can use the Observable’s pipe method to compose multiple operations into one.

It takes multiple operations as arguments, which is much cleaner than nesting them.

For example, we can use it to rewrite the example we had above:

import { of, pipe } from "rxjs";
import { map } from "rxjs/operators";

of(1, 2, 3)
  .pipe(map(x => x * 2))
  .subscribe(val => console.log(val));

We get the same result, but it’s much cleaner.

Also, we can pass in more than one operation to the pipe method:

of(1, 2, 3)
  .pipe(
    map(x => x * 2),
    map(x => x * 3)
  )
  .subscribe(val => console.log(val));

Then we first multiply each value emitted by the Observable by 2, then we multiply the returned value again by 3. As a result, we get back 6, 12, and 18 in the console.log .

Creation Operators

Creation operators are functions to create an Observable from scratch or by joining other Observables together.

For example, we have the interval function to emit a value from 0 and up in the interval that we specify:

import { interval } from "rxjs";

interval(5000).subscribe(val => console.log(val));

The code above will output an integer from 0 and up every 5 seconds.

Higher-Order Observables

Higher-Order Observables are Observables of Observables. There’re a few things that we can do with different observables.

Rxjs has the concatAll() operator that subscribes to each inner observable and copies all the emitted values until the outer observable completes.

For example, we can use it as follows:

import { of, pipe } from "rxjs";
import { concatAll, map } from "rxjs/operators";

of(1, 2, 3)
  .pipe(
    map(outerVal => {
      console.log(`outerVal ${outerVal}`);
      return of(4, 5, 6);
    }),
    concatAll()
  )
  .subscribe(innerVal => console.log(`innerVal ${innerVal}`));

We should get the output:

outerVal 1
innerVal 4
innerVal 5
innerVal 6
outerVal 2
innerVal 4
innerVal 5
innerVal 6
outerVal 3
innerVal 4
innerVal 5
innerVal 6

As we can see, we get the first value from the of(1,2,3) Observable first, and then all the values from the second. Then we get the second value from of(1,2,3) and then all the values from the second and so on.

Other operator functions include:

  • mergeAll() — subscribes to each inner Observable as it arrives, and emits each value as it arrives.
  • switchAll() — subscribes to the first inner Observable when it arrives, then emits each value as it arrives. It unsubscribes to the previous one then subscribes to the new one.
  • exhaust() — subscribes to the first inner Observable when it arrives, then emits each value as it arrives, discarding all newly arriving Observables as it completes and waits for the next inner Observable.

They all give the same result as concatAll() but the difference is underneath.

We can create new Observables by composing multiple Observables or operators.

For example:

of(1, 2, 3)
  .pipe(
    map(x => x * 2),
    map(x => x * 3)
  )

is an Observable that multiplies each value from Observableof(1, 2, 3) by 6.

With Rxjs, there’re are built-in operators that we can use to create and manipulate Observable values. Also, we can create new Observables and combine them together in different ways.

Observables can also be nested, and the values of all the nested Observables can be obtained by using concatAll() , mergeAll() , switchAll() or exhaust() .

Categories
Rxjs

Introduction to Reactive Programming with Rxjs

Rxjs is a reactive programming library that lets us use the observer pattern. This pattern lets us watch for changes in our programs and run code accordingly. Observables are the entities that emit values when it picks up changes. Observers are the entities that get the data sent by observables.

In this article, we’ll take a quick look at how to create new observables to send new values to observers.

Components of Rxjs

There’re a few parts to Rxjs. They are:

  • Observables — entities that get and send changes to observers
  • Observers — entities that watch for new data pushed from observables
  • Operators — functions that deal with collections with operations by using operations like like map, filter, concat, reduce, etc.
  • Subjects— event emitters that broadcast data to multiple observers
  • Schedulers — centralized dispatchers to control concurrency. They ley us coordinate when computation happens.

Creating Observables

We need observables to send data to observers. With Rxjs, we have an Observable constructor that lets us emit data with subscribers.

For example, we can write:

const observable = new Rx.Observable(subscriber => {
  subscriber.next(1);
  subscriber.next(2);
  setTimeout(() => {
    subscriber.next(3);
    subscriber.complete();
  }, 1000);
});

The observable above will emit 1, 2 immediately with the subscriber, and 3 after 1 second. The subscriber is the subscriber that we use to emit the data to the observers.

complete stops the observable from using the subscriber to emit more data.

We can use the observable to get the emitted values as follows:

observable.subscribe(val => console.log(val));

Also, we can pass in an object to the subscribe method with a next method for getting the emitted values, error for getting the errors, and the complete method to run something when the observable has done sending data:

observable.subscribe({
  next(x) {
    console.log(x);
  },
  error(err) {
    console.error(err);
  },
  complete() {
    console.log("done");
  }
});

Pull versus Push

Observables are push systems where data is pushed from a source that’s emitted from to the observer.

It’s a producer of multiple values. Evaluation is done only when observers get new values.

Observables are Like Functions

Observables are like functions in that they both return data for other entities. We can use the returned data in any place we wish.

For example, if we have the following function:

const foo = () => 1

and an observable:

const foo = new Observable(subscriber => {
  subscriber.next(1);
});

foo.subscribe(x => {
  console.log(x);
});

Then they both give us the value 1. Except that as we saw before, we can also get more than one value with observables which we can’t do with functions.

Also, they can either be synchronous or asynchronous like functions. From the first example, we have:

subscriber.next(1);
subscriber.next(2);

being run line by line, while:

setTimeout(() => {
  subscriber.next(3);
  subscriber.complete();
}, 1000);

waits 1 second before it runs, which means it’s asynchronous.

Parts of an Observable

Observables are created with the Obseravable constructor, which we can subscribe to with an observer.

It delivers next , error or complete notifications to observers. Observables are disposed of automatically once they received everything.

Creating an Observable

The Obserable constructor takes a callback function which has the subscriber parameter, which gets us the subscriber object to emit values.

For example, we can write:

const observable = new Observable((subscriber) => {
  const id = setInterval(() => {
    subscriber.next("foo");
  }, 1000);
});

to emit 'foo' every second to observers.

Subscribing to Observables

We call the subscribe method of the Observable object to subscribe to the values it pushes. For instance, we can write:

observable.subscribe(x => console.log(x));

to get the latest values.

The subscribe calls aren’t shared by multiple Observers of the same Observable. Each call to subscribe creates its own observer to observe the values.

Executing Observables

The code:

const observable = new Observable((subscriber) => {
  //...
});

executes the observable. It happens only for each Observable that subscribes.

It can deliver the following values:

  • next — sends values to observers
  • error — sends errors or exceptions to observers
  • complete — sends nothing

Observables adhere strictly to the Observable contract, so once complete is called, it’s not going to send more data.

For example:

const observable = new Observable(subscriber => {
  subscriber.next(1);
  subscriber.next(2);
  subscriber.next(3);
  subscriber.complete();
  subscriber.next(4);
});

4 won’t be sent to observables since complete has already been called.

Disposing Observable Executions

We can call the unsubscribe to stop subscribing to the observable. It’ll stop watching the Observable for more changes after it’s called and dispose of the resources needed to do the watching.

We can return an unsubscribe function to put clean up code that we want to add. For example, we can write:

const observable = new Observable(function subscribe(subscriber) {
  const intervalId = setInterval(() => {
    subscriber.next("hi");
  }, 1000);

  return function unsubscribe() {
    clearInterval(intervalId);
  };
});

const subscription = observable.subscribe(x => console.log(x));

setTimeout(() => {
  subscription.unsubscribe();
}, 3000);

The code above has an unsubscribe function that calls clearInterval with the ID returned by setInterval . Then we called subscribe to subscribe to our observable, which returns an object with the unsubscribe method. This method is called after 3 seconds in the callback for setTimeout .

With Rxjs, we can create Observables to emit values with the subscriber object. Then we can subscribe to observables to watch for values.

We can also create a function that cleans up when unsubscribing and call unsubscribe afterwards when we don’t want to subscribe to an Observable anymore.

Categories
JavaScript Rxjs

More Useful Rxjs Transformation Operators

RxJS is a library for reactive programming. Creation operators are useful for generating data from various data sources to be subscribed to by Observers.

In this article, we’ll look at some RxJS transformation operators like concatMapTo, expand, exhaust, and exhaustMap operators.

concatMapTo

The concatMapTo operator takes the source value of the Observable and maps each value to the same inner Observable.

It takes up to 2 arguments. The first is the innerObservable, which is the Observable to map each value from the originating Observable to.

The second argument is a resultSelector function to let us pick the results.

Each originating Observable’s emitted value is mapped to the given innerObservable regardless of the source value. Then flatten those resulting Observables into one Observable. which is the returned Observable.

Each new innerObservable’s emitted are concatenated together with the other ones’ values.

If the source values arrive endlessly and faster than the corresponding inner Observables can complete then it’ll result in memory issues since there’ll be a large amount of data amassed waiting to be subscribed to.

For example, we can use it as follows:

import { fromEvent, interval } from "rxjs";  
import { take, concatMapTo } from "rxjs/operators";
const clicks = fromEvent(document, "click");  
const result = clicks.pipe(concatMapTo(interval(1000).pipe(take(5))));  
result.subscribe(x => console.log(x));

The code above will get the click events from the document and then emit 0 after 1 second, then wait for another second and emit, up to 4.

exhaust

The exhaust operator converts a higher-level Observable, which are Observables that emits Observables, into the first-order Observable by emitted inner Observables while previous inner Observable hasn’t been completed.

The result would be that the values from all the Observables are flattened and emitted by one Observable, which is the one returned by this operator.

For example, we can write:

import { interval, of } from "rxjs";  
import { exhaust, map, take } from "rxjs/operators";
const observable = of(1, 2, 3);  
const higherOrder = observable.pipe(map(ev => interval(1000).pipe(take(5))));  
const result = higherOrder.pipe(exhaust());  
result.subscribe(x => console.log(x));

In the code above, we have an observable Observable that emits 1, 2, and 3. The values emitted by observable are mapped to inner Observables using the map operator into interval(1000).pipe(take(5) .

Since the inner Observables that aren’t completed are dropped by the exhaust operator, we’ll get that interval(1000).pipe(take(5) is only emitted once since only the first instance of it is started. Then we get that a number is emitted every second, starting with 0 and stops at 4.

exhaustMap

The exhaustMap operator also takes the first inner Observable that are mapped from the originating Observable like the exhaust operator. The difference is that it takes a function that lets us map the values of the origination Observable’s emitted values into another Observable, but only taking the first emitted Observable by this operator.

For example, if we have the following code:

import { interval, of } from "rxjs";  
import { take, exhaustMap } from "rxjs/operators";
const observable = of(1, 2, 3);  
const result = observable.pipe(exhaustMap(ev => interval(1000).pipe(take(5))));  
result.subscribe(x => console.log(x));

We get the same result as exhaust, but we don’t have to pipe and map the emitted values of observable into a new Observable. Instead, we combined everything together with the exhaustMap operator.

expand

The expand operator recursively projects each source value to an Observable which is then merged into the output Observable.

It takes 3 arguments, which is the project function, which applies to the items emitted by the source Observable or the output Observable and returns a new Observable.

The second argument is the concurrency, which is an optional argument for the maximum number of input Observables being subscribed to concurrently. It defaults to Number.POSITIVE_INFINITY .

The last argument is the optional scheduler object, which is the scheduler we can use to time the emission of values.

One simple example usage of this operator would be as follows:

import { of } from "rxjs";  
import { expand, delay, take } from "rxjs/operators";
const powersOfTwo = of(1, 2, 3).pipe(  
  expand(x => of(x).pipe(delay(1000))),  
  take(10)  
);  
powersOfTwo.subscribe(x => console.log(x));

In the code above, we have the of(1, 2, 3) Observable, which have the emitted values pipe d to the expand(x => of(x).pipe(delay(1000))) Observable. Then the first 10 values are taken from the Observable returned by expand, which takes the of(1, 2, 3) Observable’s emitted values and emit them repeatedly until we have 10 values emitted and each group is emitted after waiting 1 second.

This is because we specified of(x) which takes the of(1, 2, 3) values and then get then emit them without making any changes. pipe(delay(1000)) delays the emission of each group of values 1, 2 and 3 one second apart.

This should result in the following output:

1  
2  
3  
1  
2  
3  
1  
2  
3  
1

Conclusion

concatMapTo operator takes the source value of the Observable and maps each value to the same inner Observable.

The exhaust operator converts an Observable which emits Observables and then emit the values from the first inner Observable and discard the ones that haven’t finished emitting.

The exhaustMap operator also takes the first inner Observable then map them into inner Observables, which then have the values emitted.

Finally, the expand operator recursively projects each source value to an Observable which is then merged into the output Observable and emitted.

Categories
JavaScript Rxjs

More Useful Rxjs Creation Operators

Rxjs is a library for doing reactive programming. Creation operators are useful for generating data from various data sources to be subscribed to by Observers.

In this article, we’ll look at more creation operators from Rxjs, like functions that create Observables from event handlers, functions that generate Observables that emit numbers, and functions that let us conditionally subscribe to Observables.

fromEvent

The fromEvent function creates an Observable that emits event objects of the type that we specified as it happens to the event target.

It takes up to 4 arguments. The first argument is the event target. which is required. It can be a DOM event target, Node.js event emitter, jQuery like event target, NodeList, or HTMLCollection to attach event handlers to.

The second argument is the event name that’s being emitted by the event target.

The 3rd argument is an optional argument with some options.

The last argument is an optional result selector function.

Every time the Observable is subscribed to, the event handler function will be registered to the event target on the given event type.

When the event fires, the event object will emitted by the Observable.

The event targets are checked by duck typing. We can safely use fromEvent on the object on the object if it exposes the following methods:

  • DOM event target if it has the addEventLister and removeEventListener methods
  • Node.js event emitter if it has the addListener and removeListener methods
  • jQuery style objects if it has the on and off methods
  • DOM NodeLists or HtmlCollection if they have a list of DOM nodes returned by methods like document.querySelectorAll or the childNodes property of a DOM node.

We can use fromEvent as follows:

import { fromEvent } from "rxjs";
const clicks = fromEvent(window, "click");  
clicks.subscribe(x => console.log(x));

The code above watches for clicks on the document object. We should see MouseEvent objects logged when we click anywhere on the browser tab.

fromEventPattern

This is similar to fromEvent , except that we pass in handler functions for adding event listeners and removing event listeners.

It takes 3 arguments. The first 2 are the add and remove handlers respectively. The remove handler is optional. The last argument is an optional result selector function for manipulating emitted values.

For example, we can use it to detect the clicks on a div with ID app as follows:

import { fromEventPattern } from "rxjs";
const app = document.querySelector("#app");

function addClickHandler(handler) {  
  app.addEventListener("click", handler);  
}

function removeClickHandler(handler) {  
  app.removeEventListener("click", handler);  
}

const clicks = fromEventPattern(addClickHandler, removeClickHandler);  
clicks.subscribe(x => console.log(x));

We should see MouseEvent objects logged when we click anywhere on a div with the ID app .

generate

The generate function creates an Observable with a stream of values by passing in the initial state, a function with the condition for the ending the emitting of values, a function for iterating through the values, result selector function for selecting the emitted results, and a scheduler object for changing the timing for emitting the values.

Only the initial state is required. For example, we can create an Observable that emits the values 0 to 9 by writing:

import { generate } from "rxjs";
const generated = generate(0, x => x < 10, x => x + 1);  
generated.subscribe(  
  value => console.log(value),  
  err => {}  
);

The first argument of the generate function call has the first value to emit or the initial state. The second has the ending condition, which is less than 10. The last argument has the function that indicates how to move on and emit the next item and move towards the ending condition in the second argument.

interval

interval creates an Observable that emits sequential numbers in a specified interval of time.

It takes 2 optional arguments. The first is the number of milliseconds or the time unit determined by the scheduler’s clock. The default is 0.

The second argument is the scheduler to use, which defaults to async .

For example:

import { interval } from "rxjs";
const numbers = interval(1000);

creates an Observable that emits a new value every second.

of

This creates an Observable out of its arguments.

It takes an infinite number of arguments.

For example, we can use it as follows:

import { of } from "rxjs";
of(1, 2, 3).subscribe(  
  val => console.log(val),  
  err => console.log(err),  
  () => console.log("end")  
);

range

We can use range to create an Observable that emits a sequence of numbers within the specified range.

It takes 3 optional arguments, which are the start, which defaults to 0. The number of integers to generate, which defaults to undefined and the scheduler to use, which defaults to undefined .

For example, we can use it to create an Observable which emits number 1 to 20 as follows:

import { range } from "rxjs";
const numbers = range(1, 20);  
numbers.subscribe(x => console.log(x));

throwError

Creates an Observable that only emits an error notification.

It takes up to 2 arguments. The first is the error to emit. The second is an optional scheduler argument to let us choose the scheduler. It defaults to undefined .

We can use it as follows:

import { throwError } from "rxjs";
const numbers = throwError("error");  
numbers.subscribe(() => {}, err => console.log(err));

We subscribed to the error notification in the second argument.

timer

timer creates an Observable that starts emitting values after a specified time and emit ever-increasing number after a specified interval thereafter.

The first argument is the time that the Observable starts emitting, which defaults to 0. The number is in milliseconds.

The second argument is the period of emitting values which defaults to undefined . The number is in milliseconds.

The last argument is the scheduler to use, which defaults to undefined .

For example, we can create an Observable to emit values after 2 seconds, then every second thereafter by writing:

import { timer } from "rxjs";
const numbers = timer(2000, 1000);  
numbers.subscribe(x => console.log(x));

iif

Let us create an Observable that decides which Observable will be subscribed to at subscribe time.

It takes up to 3 arguments. The first is the condition for which Observable to be chosen. The 2nd and 3rd arguments are the Observable that are chosen when the condition is true and false respectively.

For example, we can use it as follows:

import { iif, of } from "rxjs";
let wantFoo;  
const fooBar = iif(() => wantFoo, of("foo"), of("bar"));
wantFoo = true;  
fooBar.subscribe(val => console.log(val));
wantFoo = false;
fooBar.subscribe(val => console.log(val));

In the code above, if wantFoo is true , when of('foo') is subscribed. Otherwise, of('bar') is subscribed.

We can create Observables that handle DOM or Node.js events with the fromEvent creation operator.

of operator lets us create Observables from any list of objects.

throw only throws an error and does nothing else.

generate , interval , and range let us create Observables that emit number ranges.

timer lets us create timed Observables and iif lets us create conditional Observables.

Categories
JavaScript Rxjs

Rxjs Filtering Operators — Audit and Debounce

RxJS is a library for doing reactive programming. Creating operators are useful for generating data from various sources to be subscribed to by Observers.

In this article, we’ll look at some filtering operators which let us filter out emitted values in a way that we specify, including the audit, auditTime, debounce, and debounceTime operators.

audit

The audit operator lets us filter out emitted values from the source Observable for a period of time, then emits the most recent values. Then the process is repeated.

It takes one argument, which is the durationSelector function which receives a value from the source Observable to use to compute the duration to which to ignore emitted values.

The durationSelector function can return s promise or an Observable.

It returns the Observable that does the rate-limiting.

It’s similar to throttle, but it returns the last value from the silenced duration instead of the first.

When the duration Observable emits a value or completes, the timer is disabled. Then the most recent source value is emitted from the output Observable.

For example, we can use it as follows:

import { interval } from "rxjs";  
import { audit } from "rxjs/operators";

const interval$ = interval(1000);  
const result = interval$.pipe(audit(ev => interval(4000)));  
result.subscribe(x => console.log(x));

The code above takes the interval$ Observable then pipes the value into the audit operator, which has the function which returns interval(4000). This means the value from the returned Observable will emit values every 4 seconds.

The effect produced would be that every 3 values are skipped.

auditTime

The auditTime operator ignores the source Observable’s values for duration milliseconds then emit the most recent value from the source Observable and repeats the process.

It takes up to 2 arguments. One is the duration , which is the time to wait before emitting the most recent value. The duration is measured in milliseconds or the time unit determined by the optional scheduler argument.

The second is an optional scheduler to let us time the values emitted.

It returns the Observable that does the rate-limiting.

For example, we can use it as follows:

import { interval } from "rxjs";  
import { auditTime } from "rxjs/operators";

const interval$ = interval(1000);  
const result = interval$.pipe(auditTime(5000));  
result.subscribe(x => console.log(x));

The code above takes the emitted values from the interval$ Observable, which emits numbers every second, then pipe s it to the auditTime(5000) operator, which emits the latest value from the interval$ Observable every 5 seconds.

The result will be that we get the latest value emitted from interval$ every 5 seconds.

debounce

debounce emits a value from the source Observable after a particular time span has passed without another source emission.

It takes one argument, which is a durationSelector function that receives a value from the source Observable for computing the timeout duration for each source value.

The durationSelector function can return a promise or an Observable.

It returns an Observable that emits values according to the delay specified by the return value of the durationSelector function. The returned Observable may drop some values if emissions from the source happen too frequently.

For example, we can use it as follows. Given that we have the following input element:

<input type="text" />

We can write the following JavaScript code to delay the emission of the input value for 2 seconds:

import { fromEvent, interval } from "rxjs";  
import { debounce } from "rxjs/operators";

const clicks = fromEvent(document.querySelector("input"), "input");  
const result = clicks.pipe(debounce(() => interval(2000)));  
result.subscribe(x => console.log(x));

The input events are received by the fromEvent function, then it’s pipe d to our durationSelector function which is () => interval(2000) via the debounce operator.

debounceTime

Like the debounce operator, the debounceTime operator emits values from the source Observable after a delay has passed without another emission from the source Observable.

The difference is in the argument it accepts. It takes up to 2 arguments. The first is the dueTime number, which is the duration in milliseconds or the time unit specified by the optional scheduler argument for the time to delay emission of values.

The second argument is the optionalscheduler argument, which defaults to async . It lets us time the emissions as we want to.

It returns an Observable that delays the emission of the source Observable by the specified dueTime and may drop some values if emissions from the source happen too frequently.

For example, we can use it as follows. Given that we have the following input element:

<input type="text" />

We can use the debounceTime operator to delay the emission of input event objects as follows:

import { fromEvent } from "rxjs";  
import { debounceTime } from "rxjs/operators";

const clicks = fromEvent(document.querySelector("input"), "input");  
const result = clicks.pipe(debounceTime(2000));  
result.subscribe(x => console.log(x));

The code above will delay the emission of input events objects by 2 seconds and drops whatever is emitted less than 2 seconds.

audit and auditTime lets us filter out emitted values from the source Observable for a period of time, then emits the most recent values. Then the process is repeated.

debounce and debounceTime operators emit values from the source Observable after a delay has passed without another emission from the source Observable.