Categories
JavaScript Rxjs

Some Useful Rxjs Creation Operators

Rxjs is a library for doing reactive programming. Creation operators are useful for generating data from various data sources to be subscribed to by Observers.

In this article, we’ll look at some creation operators from Rxjs.

Ajax

We can use the ajax() operator to fetch response objects returned from APIs.

For example, we can use it as follows:

const observable = ajax(`https://api.github.com/meta`).pipe()  
  map(response => {  
    console.log(response);  
    return response;  
  }),  
  catchError(error => {  
    console.log(error);  
    return of(error);  
  })  
);

observable.subscribe(res => console.log(res));

We pipe the data from the response with the map operator. Also, we can catch HTTP errors with the catchError operator.

Also, we can use ajax.getJSON() to simplify the operation as follows:

import { ajax } from "rxjs/ajax";  
import { map, catchError } from "rxjs/operators";  
import { of } from "rxjs";

const observable = ajax.getJSON(`https://api.github.com/meta`).pipe()  
  map(response => {  
    console.log(response);  
    return response;  
  }),  
  catchError(error => {  
    console.log("error: ", error);  
    return of(error);  
  })  
);

observable.subscribe(res => console.log(res));

Note that in both examples, we return the response in the callback of the map that we passed into the map operator.

It also works for POST requests:

import { ajax } from "rxjs/ajax";  
import { map, catchError } from "rxjs/operators";  
import { of } from "rxjs";

const observable = ajax({  
  url: "https://jsonplaceholder.typicode.com/posts",  
  method: "POST",  
  headers: {  
    "Content-Type": "application/json"  
  },  
  body: {  
    id: 1,  
    title: "title",  
    body: "body",  
    userId: 1  
  }  
}).pipe(  
  map(response => console.log("response: ", response)),  
  catchError(error => {  
    console.log("error: ", error);  
    return of(error);  
  })  
);

observable.subscribe(res => console.log(res));

As we can see, we can set headers and body of the request, so ajax can deal with most HTTP requests.

Errors can also be caught with the catchError operator that we pipe in:

import { ajax } from "rxjs/ajax";  
import { map, catchError } from "rxjs/operators";  
import { of } from "rxjs";

const observable = ajax(`https://api.github.com/404`).pipe()  
  map(response => {  
    console.log(response);  
    return response;  
  }),  
  catchError(error => {  
    console.log(error);  
    return of(error);  
  })  
);

observable.subscribe(res => console.log(res));

bindCallback

bindCallback converts a callback API to a function that returns an Observable.

It can convert a function with parameters to an Observable by emitting the parameters.

It takes 3 arguments. The first is a function, which takes a callback function as a parameter. Whatever is passed into the callback function will be emitted by the Observable.

The second argument is an optional resultSelector . We can pass in a function to select the emitted results here.

The last argument is an optional scheduler. We can pass in a scheduler if we want to change the way the callback function in the first argument is scheduled to be called.

import { bindCallback } from "rxjs";

const foo = fn => {  
  fn("a", "b", "c");  
};

const observableFn = bindCallback(foo);  
observableFn().subscribe(res => console.log(res));

Then we’ll see 'a' , 'b' and 'c' since we passed them into our fn callback function, which is a parameter of foo .

Then we return a function that returns an Observable with the bindCallback function. Then we can subscribe to the returned Observable.

defer

defer lets us create an Observable that are only created when a subscription is made.

It takes one argument, which is an Observable factory function. For example, we can write:

import { defer, of } from "rxjs";
const clicksOrInterval = defer(() => {  
  return Math.random() > 0.5 ? of([1, 2, 3]) : of([4, 5, 6]);  
});  

clicksOrInterval.subscribe(x => console.log(x));

Then we can have an Observable that either subscribes to of([1, 2, 3]) or of([4, 5, 6]) depending on whether Math.random() return 0.5 or less or bigger than 0.5.

empty

Creates an Observable that emits nothing to Observers except for complete notification.

It takes one optional argument, which is the scheduler that we want to use.

For example, we can use it as follows:

import { empty } from "rxjs";
const result = empty();  
result.subscribe(x => console.log(x));

Then we should see nothing logged.

Another example would be to emit the value 'odd' when odd numbers are emitted from the original Observable:

import { empty, interval, of } from "rxjs";  
import { mergeMap } from "rxjs/operators";const interval$ = interval(1000);  
const result = interval$.pipe(  
  mergeMap(x => (x % 2 === 1 ? of("odd") : empty()))  
);  
result.subscribe(x => console.log(x));

from

from creates an Observable from an array, array-like object, a promise, iterable object or Observable-like object.

It takes 2 arguments, which is an array, array-like object, a promise, iterable object or Observable-like object.

The other argument is an optional argument, which is a scheduler.

For example, we can use it as follows:

import { from } from "rxjs";
const array = [1, 2, 3];  
const result = from(array);result.subscribe(x => console.log(x));

We can also use it to convert a promise to an Observable as follows:

import { from } from "rxjs";

const promise = Promise.resolve(1);  
const result = from(promise);
result.subscribe(x => console.log(x));

This is handy for situations where we want to do that, like converting fetch API promises to Observables.

As we can see, the creation operators are pretty useful for turning various data sources to Observables.

We have the ajax operator for getting HTTP request responses. The bindCallback function turns callback arguments into Observable data. defer let us create Observables on the fly when something subscribes to the Observable returned by the defer operator.

Finally, we have the empty operator to create an Observable that emits nothing, and a from operator to create Observables from an array, array-like object, a promise, iterable object or Observable-like object.

Categories
JavaScript Rxjs

More Rxjs Operators

Rxjs is a library for doing reactive programming. Creation operators are useful for generating data from various data sources to be subscribed to by Observers.

In this article, we’ll look at some join creation operators to combine data from multiple Observables into one Observable. We’ll look at the merge , race and zip join creation operators, and also the buffer and bufferCount transformation operators.

Join Creation Operators

These operators combine the values emitted from multiple Observers into one.

merge

The merge operator takes multiple Observables and concurrently emits all values from every given input Observable.

It takes one array of Observables or a comma-separated list of Observables as arguments.

For example, we can use it as follows:

import { merge, of } from "rxjs";
const observable1 = of(1, 2, 3);  
const observable2 = of(4, 5, 6);  
const combined = merge(observable1, observable2);  
combined.subscribe(x => console.log(x));

Another example would be combining multiple timed Observables as follows:

import { merge, interval } from "rxjs";
const observable1 = interval(1000);  
const observable2 = interval(2000);  
const combined = merge(observable1, observable2);  
combined.subscribe(x => console.log(x));

We’ll see that the first observable1 will emit a value first, then observable2 . Then observable1 will continue to emit values every second, and observable2 will emit values every 2 seconds.

race

The race operator takes multiple Observables and returns the Observable that emits an item from the arguments.

It takes a comma-separated list of Observables as arguments.

For example, we can use it as follows:

import { race, of } from "rxjs";
const observable1 = of(1, 2, 3);  
const observable2 = of(4, 5, 6);  
const combined = race(observable1, observable2);  
combined.subscribe(x => console.log(x));

We have observable1 , which emits data before observable2 . We should get the output:

1  
2  
3

since observable emits values first.

zip

The zip operator combines multiple Observables and returns an Observable whose values are calculated from the values, in order of each of its input Observables.

It takes a list of Observables as arguments. We can use it as follows:

import { zip, of } from "rxjs";
const observable1 = of(1, 2, 3);  
const observable2 = of(4, 5, 6);  
const combined = zip(observable1, observable2);  
combined.subscribe(x => console.log(x));

Then we get the following:

[1, 4]  
[2, 5]  
[3, 6]

We can also map them to objects as follows to make values from one Observable easier to distinguish from the other.

To do this, we can write the following:

import { zip, of } from "rxjs";  
import { map } from "rxjs/operators";
const age$ = of(1, 2, 3);  
const name$ = of("John", "Mary", "Jane");  
const combined = zip(age$, name$);  
combined  
  .pipe(map(([age, name]) => ({ age, name })))  
  .subscribe(x => console.log(x));

Transformation Operators

buffer

The buffer operator buffers the source Observable values until the closingNotifier emits.

It takes one argument, which is the closingNotifier . It’s an Observable that signals the buffer to be emitted on the output Observable.

For example, we can use it as follows:

import { fromEvent, timer } from "rxjs";  
import { buffer } from "rxjs/operators";
const observable = timer(1000, 1000);  
const clicks = fromEvent(document, "click");  
const buffered = observable.pipe(buffer(clicks));  
buffered.subscribe(x => console.log(x));

In the code above, we have an Observable created by the timer operator which emits numbers every second after 1 second of waiting. Then we pipe our results into the clicks Observable, which emits as clicks are made to the document.

This means that as we click the page, the emitted data that are buffered by the buffer operator will emit the data that was buffered. Also, this means that as we click our document, we’ll get anything from an empty array to an array of values that were emitted between clicks.

bufferCount

bufferCount is slightly different from buffer in that it buffers the data until the size hits the maximum bufferSize .

It takes 2 arguments, which are the bufferSize , which is the maximum size buffered, and the startBufferEvery parameter which is an optional parameter indicating the interval at which to start a new buffer.

For example, we can use it as follows:

import { fromEvent } from "rxjs";  
import { bufferCount } from "rxjs/operators";  
const clicks = fromEvent(document, "click");  
const buffered = clicks.pipe(bufferCount(10));  
buffered.subscribe(x => console.log(x));

The code above will emit the MouseEvent objects that are buffered into the array once we clicked 10 times since this is when we 10 MouseEvent objects are emitted by the originating Observable.

As we can see, the join creation operators lets us combine Observables’ emitted data in many ways. We can pick the first ones emitted, we can combine all the emitted data into one, and we can get them concurrently.

Also, we can buffer Observable’s emitted data and emit them when a given amount is buffered or a triggering event will emit the data in the buffer.

Categories
JavaScript Rxjs

Rxjs Operators — Utility Operators

Rxjs is a library for doing reactive programming. Creation operators are useful for generating data from various data sources to be subscribed to by Observers.

In this article, we’ll look at some utility operators that help us do various things, including the tap , delay , delayWhen , and dematerialize operators.

tap

The tap operator returns an Observable that lets us perform some side effects for every emission on the source Observable and return the emit the same values from the source.

It takes 3 optional arguments. The first is a nextOrObserver object which is a normal Observable or callback for next . The second is an error callback, which is a callback for errors in the source. The last is the complete callback, which is called when the source Observable completes.

It lets us intercept the emission from the source do something during that time by running a function. Then it returns the same output as the source Observable.

For example, we can use it as follows:

import { of } from "rxjs";  
import { tap } from "rxjs/operators";  
const of$ = of(1, 2, 3);  
const result = of$.pipe(  
  tap(  
    val => console.log(`value: ${val}`),  
    error => console.log(error),  
    () => console.log("complete")  
  )  
);  
result.subscribe(x => console.log(x));

Then we should get:

value: 1  
1  
value: 2  
2  
value: 3  
3  
complete

since we logged the values emitted with:

val => console.log(`value: ${val}`)

And we logged 'complete' when the of$ Observable completes by writing:

() => console.log("complete")

delay

The delay operator returns an Observable that delays the emission of items from the source Observable by the specified delay value or until a given Date.

It takes up to 2 arguments. The first is the delay , which is the delay duration in milliseconds or a Date until which the emission of the source item is delayed.

The second argument is an optional scheduler , which defaults to async . It lets us change the timing mechanism for emitting the values by the returned Observable.

For example, we can use it as follows:

import { of } from "rxjs";  
import { delay } from "rxjs/operators";  
const of$ = of(1, 2, 3);  
const result = of$.pipe(delay(1000));  
result.subscribe(x => console.log(x));

For example, we can use it as follows to delay the emission of values from the of$ Observable by a second with delay(1000) .

We should get 1, 2, and 3 emitted after the delay.

Also, we can delay emission to a specific date as follows:

import { of } from "rxjs";  
import { delay } from "rxjs/operators";  
import moment from "moment";  
const of$ = of(1, 2, 3);  
const result = of$.pipe(  
  delay(  
    moment()  
      .add(5, "seconds")  
      .toDate()  
  )  
);  
result.subscribe(x => console.log(x));

The code above will delay the emission of values from of$ by 5 seconds from the current date and time.

The output should be unchanged.

delayWhen

The delayWhen operator returns an Observable that delays the emission from the source Observable by a given time span determined by the emission of another Observable.

It takes up to 2 arguments. The first is the delayDurationSelector function that returns an Observable for each value emitted by the source Observable. When the Observable returned by this function emits, then the value from the source Observable will emit.

The second argument is an optional argument for the subscriptionDelay . It’s an Observable that triggers the subscription to the source Observable once it emits any value.

For instance, we can use it as follows:

import { of, interval } from "rxjs";  
import { delayWhen } from "rxjs/operators";  
const of$ = of(1, 2, 3);  
const result = of$.pipe(delayWhen(val => interval(Math.random() * 5000)));  
result.subscribe(x => console.log(x));

The code above will delay the emission each value emitted by the of$ Observable by Math.random() * 5000 milliseconds. This means that each value will be emitted when interval(Math.random() * 5000) emits.

The result will be that 1, 2 and 3 will be emitted in an random order.

dematerialize

The dematerialize returns an Observable object that turns an Observable of Notification objects into emissions that they represent.

It takes no parameters.

For example, we can use it as follows:

import { of, Notification } from "rxjs";  
import { dematerialize } from "rxjs/operators";

const notifA = new Notification("N", 1);  
const notifB = new Notification("N", 2);  
const notifE = new Notification("E", undefined, new Error("error"));  
const materialized = of(notifA, notifB, notifE);  
const upperCase = materialized.pipe(dematerialize());  
upperCase.subscribe(x => console.log(x), e => console.error(e));

Then we should get:

1  
2  
Error: error

Since we converted the Notification objects’ values into the values that are emitted by the return Observable.

The second argument passed into the Notification constructor is the values that are emitted for the normal values emitted. The error is the third argument of the Notification constructor.

tap returns an Observable that lets us perform some side effects for every emission on the source Observable and then emit the same values from the source.

delay returns an Observable that delays the emission of items from the source Observable by the specified delay value or until a given Date.

delayWhen returns an Observable that delays the emission from the source Observable by a given time span which is determined by the emission of another Observable.

dematerialize returns an Observable object that emits the values set in the Notification objects emitted by the source Observable.

Categories
JavaScript Rxjs

Rxjs Operators — Throttle and Join

Rxjs is a library for doing reactive programming. Creation operators are useful for generating data from various data sources to be subscribed to by Observers.

In this article, we’ll look at the throttle and join operators, including the throttle , throttleTime , combineLatest and concatAll .

Filtering Operators

Throttle

The throttle operator emits values from the source Observable, then ignores subsequently emitted values from the source for a duration determined by another Observable, then repeats the process.

It takes up to 2 arguments. The first is the durationSelector , which is a function that takes a value from the source Observable then returns an Observable or Promise that computes the throttling duration for each source value.

The second argument is an optional config object to define the leading and trailing behavior. Default value is defaultThrottleConfig .

It returns an Observable that performs the throttle operation from the source.

For example, we can use it as follows:

import { interval } from "rxjs";  
import { throttle } from "rxjs/operators";

const interval$ = interval(1000);  
const result = interval$.pipe(throttle(ev => interval(5000)));  
result.subscribe(x => console.log(x));

The code above has an interval$ Observable that emits a number every second. The values from it are pipe d into the throttle operator, which takes a function that returns the interval(5000) Observable, which emits a number every 5 seconds.

Therefore, the throttle operator will emit values from interval$ every 5 seconds since our throttle callback function returned interval(5000) .

Then we should get every 5th number logged in the console.log .

throttleTime

throttleTime emits a value from the source Observable then ignores subsequently emitted values for duration milliseconds then repeats the process.

It takes up to 3 arguments. The first is the duration , which is the time to wait before emitting another value after emitting the last value. It’s measured in milliseconds or the time unit of the optional scheduler .

The second argument is the optionalscheduler , which defaults to async . It’s used for setting the timing of the emission.

The last argument is the config , which is optional. It defaults to defaultThrottleConfig . We can pass in an object to define the leading and trailing behavior. Default value is{ leading: true, trailing: false }.

For example, we can use it as follows:

import { interval } from "rxjs";  
import { throttleTime } from "rxjs/operators";

const interval$ = interval(1000);  
const result = interval$.pipe(throttleTime(5000));  
result.subscribe(x => console.log(x));

The above works like our previous throttle example, except that we change throttle(ev => interval(5000)) to throttleTime(5000) , which do the same thing.

We emit numbers from $interval every 5 seconds.

Then we get the same numbers logged as the example above.

Join Operators

combineAll

The combineAll operator flattens Observable of Observables by applying combineLatest when they complete.

It takes one optional argument, which is a project function to map each value emitted to something we want.

Once the outer Observable completes then it subscribes to all collected Observables and combines their values as follows:

  • Every time an inner Observable emits, the output Observable emits
  • When the returned Observable emits and a project function is specified then project is called as the values arrive and it’ll manipulate each value with the project function
  • If there’s no project function then the most recent values is emitted by the returned Observable

For example, we can use it as follows:

import { of } from "rxjs";  
import { map, combineAll } from "rxjs/operators";

const of$ = of(1, 2, 3);  
const higherOrderObservable = of$.pipe(map(val => of("a", "b", "c")));  
const result = higherOrderObservable.pipe(combineAll());  
result.subscribe(x => console.log(x));

The code above maps the of$ Observable to child Observables. It returns of(“a”, “b”, “c”) for each value emitted by of$ .

Then we use combineAll to combine the latest values from each of the child Observables in higherOrderObservable into an array. Then each of these arrays is emitted by the result Observable.

Then we get:

["c", "c", "a"]  
["c", "c", "b"]  
["c", "c", "c"]

as the console.log output. The first 2 Observables completed before the last one, so we get 'c' from them. Then the as the third one emits, the last value in the array is added, so we get 'a' , 'b' and 'c' respectively from them.

concatAll

The concatAll operator converts higher-order Observables by concatenating the inner Observables in order,

It takes no parameters and returns an Observable that emits the values from all inner Observables concatenated.

For example, we can use it as follows:

import { of } from "rxjs";  
import { map, concatAll } from "rxjs/operators";

const of$ = of(1, 2, 3);  
const higherOrderObservable = of$.pipe(map(val => of("a", "b", "c")));  
const result = higherOrderObservable.pipe(concatAll());  
result.subscribe(x => console.log(x));

The of$ Observable’s emitted values is pipe d and each value from of$ is mapped to the of(“a”, “b”, “c”) Observable.

Then we pipe the results from the inner Observables in higherOrderObservable which are 3 of(“a”, “b”, “c”) Observables mapped from of$ have their emitted values combined by concatAll .

In the end, we get:

a  
b  
c  
a  
b  
c  
a  
b  
c

The throttle operator emits values from the source Observable, then ignores subsequently emitted values from the source for a duration determined by another Observable and repeats.

throttleTime emits a value from the source Observable then ignores subsequently emitted values for a given amount of time and repeats.

combineAll flattens Observable of Observables by combining the latest values when they complete.

concatAll operator combines the emitted values of inner Observables in order.

Categories
JavaScript Rxjs

Rxjs Operators — More Utility Operators

Rxjs is a library for doing reactive programming. Creation operators are useful for generating data from various data sources to be subscribed to by Observers.

In this article, we’ll look at some utility operators that help us do various things, including the materialize , observeOn , subscribeOn , timeInterval and timeStamp operators.

materialize

The materialize operator maps all the emitted values from the source Observable into Notification objects.

It takes no arguments and returns an Observable that emits Notification objects with the values from the source Observable as values of the value property of the Notification object.

For example, we can use it as follows:

import { of } from "rxjs";  
import { materialize } from "rxjs/operators";

const of$ = of(1, 2, 3);  
const materialized = of$.pipe(materialize());  
materialized.subscribe(x => console.log(x));

The code above takes the values of the of$ Observable, wrap each in a Notification object and emit it.

Then we’ll get the following objects:

{  
  "kind": "N",  
  "value": 1,  
  "hasValue": true  
}  
{  
  "kind": "N",  
  "value": 2,  
  "hasValue": true  
}  
{  
  "kind": "N",  
  "value": 3,  
  "hasValue": true  
}  
{  
  "kind": "C",  
  "hasValue": false  
}

It’ll emit error values when encountered:

import { of } from "rxjs";  
import { materialize, map } from "rxjs/operators";

const of$ = of(1, 2, 3);  
const upperCase = of$.pipe(map(x => x.toUpperCase()));  
const materialized = upperCase.pipe(materialize());  
materialized.subscribe(x => console.log(x));

Then we get:

{  
  "kind": "E",  
  "error": {},  
  "hasValue": false  
}

since x has no toUpperCase method as they’re numbers emitted by the source Observable.

observeOn

The observeOn operator returns an Observable that re-emits all notifications from the source Observable with the specified scheduler.

It takes up to 2 arguments. The first is the scheduler that we’ll use to reschedule notifications from the source Observable.

The second argument is the optional delay argument, which defaults to 0. It’s the number of milliseconds that represents the delay for every notification that’s rescheduled.

We shouldn’t use observeOn to emit values from source Observables that emits lots of values synchronously. The delay operator is better for delaying the emission of values from the source Observable.

For example, we can use it as follows:

import { interval, animationFrameScheduler } from "rxjs";  
import { observeOn, take } from "rxjs/operators";
const intervals = interval(10).pipe(take(10));  
const animationInterval = intervals.pipe(observeOn(animationFrameScheduler));  
animationInterval.subscribe(x => console.log(x));

We piped the emitted values from the intervals Observable to the animationFrameScheduler , which helps with smoothing out changes for animations.

subscribeOn

The subscribeOn operator returns an Observable that asynchronously subscribes to Observers to the source Observable.

It takes up to 2 arguments. The first is the scheduler to perform the subscription action on, and the second is an optional delay argument, which defaults to 0.

For example, we can use it as follows:

import { of, merge, asyncScheduler } from "rxjs";  
import { subscribeOn } from "rxjs/operators";

const a = of("a", "b", "c").pipe(subscribeOn(asyncScheduler));  
const b = of(5, 6, 7, 8, 9);  
merge(a, b).subscribe(console.log);

Then we get:

5  
6  
7  
8  
9  
a  
b  
c

from the console.log since a is converted to an asynchronous Observable by the subscribeOn(asyncScheduler) , while b still emits values synchronously.

Asynchronous emissions are queued after synchronous ones. Therefore, the values are emitted later than the synchronous ones.

timeInterval

The timeInterval returns an Observable that emits an object containing the current value and the time that’s passed between emitting the current value and the previous value. The time difference is calculated with the scheduler ‘s now() method to get the current time for each emission.

The scheduler defaults to async , so by default, the time is measured in milliseconds.

It takes one optional argument, which is the scheduler , which is used to get the current time.

For example, we can use it as follows:

import { interval } from "rxjs";  
import { timeInterval, take } from "rxjs/operators";

const a = interval(1000).pipe(take(3));  
a.pipe(timeInterval()).subscribe(  
  value => console.log(value),  
  err => console.log(err)  
);

The a Observable’s emitted values are pipe d to the timeInterval operator which emits an object with the value from the source Observable along with the time interval between the current and previous emitted value.

Then we get:

{  
  "value": 0,  
  "interval": 1001  
}  
{  
  "value": 1,  
  "interval": 1000  
}  
{  
  "value": 2,  
  "interval": 999  
}

from the console.log .

timestamp

The timestamp operator returns an Observable that adds a timestamp to each item emitted by the source Observable to indicate when it was emitted.

It takes an optional scheduler argument which defaults to async .

For example, we can use it as follows:

import { interval } from "rxjs";  
import { timestamp, take } from "rxjs/operators";

const a = interval(1000).pipe(take(3));  
a.pipe(timestamp()).subscribe(  
  value => console.log(value),  
  err => console.log(err)  
);

The code above takes the items emitted by the a Observable then pipe it to the timestamp operator, which returns an object that has the value emitted by the source Observable along with the timestamp of when it’s emitted.

Then we get:

{  
  "value": 0,  
  "timestamp": 1576632836715  
}  
{  
  "value": 1,  
  "timestamp": 1576632837715  
}  
{  
  "value": 2,  
  "timestamp": 1576632838715  
}

The materialize operator maps all the emitted values from the source Observable into Notification objects.

observeOn returns an Observable that re-emits all notifications from the source Observable with the specified scheduler.

subscribeOn returns an Observable that asynchronously subscribes to Observers to the source Observable.

timeInterval returns an Observable that emits an object containing the current value and the time that’s passed between emitting the current value and the previous value.

timestamp returns an Observable that adds a timestamp to each item emitted by the source Observable of when it was emitted.