Categories
JavaScript

Using Rxjs Join Creation Operators to Combine Observer Data

Spread the love

RxJS is a library for reactive programming. Creation operators are useful for generating data from various data sources to be subscribed to by Observers.

In this article, we’ll look at some join creation operators to combine data from multiple Observables into one Observable. We’ll look at the combineLatest, concat, and forkJoin operators.

combineLatest

We can use the combineLatest to combine multiple Observables into one with values that are calculated from the latest values of each of its input Observables.

It takes 2 or more Observables as arguments or one array of Observable as an argument. It returns an Observable that emits values that are an array of values of all the Observables that were passed in.

combineLatest also takes an optional project function, which takes an argument of all values that would be normally be emitted by the resulting Observable, then we can return what we want given the values in that function.

combineLatest works by subscribing to each Observablke in order and whenever an Observable emits, collect the emitted data into an array of the most recent values of each Observable. Then the array of values gets emitted by the returned Observable.

To ensure that the output array always has the same length, combineLastest wait for all input Observables to emit at least once before it starts emitting results. If some Observable emits values before others do, then those values will be lost.

If some Obsetrvables doesn’t emit by completes, then the returned Observable will complete without emitting anything since that one didn’t emit any value.

If at least one Observable was passed into combineLatest and all of them emitted something, then the returned Observable will complete when all the combined streams complete. In this case, the value will always be the last emitted value for the Observables that completed earlier.

For example, we can use it as follows:

import { combineLatest, of } from "rxjs";
const observable1 = of(1, 2, 3);  
const observable2 = of(4, 5, 6);  
const combined = combineLatest(observable1, observable2);  
combined.subscribe(value => console.log(value));

Then we get:

[3, 4]  
[3, 5]  
[3, 6]

since observable1 emitted all its values before observable2 did.

We can also use the optional second argument to do some calculations:

import { combineLatest, of } from "rxjs";  
import { map } from "rxjs/operators";
const observable1 = of(1, 2, 3);  
const observable2 = of(4, 5, 6);  
const combined = combineLatest(observable1, observable2).pipe(  
  map(([a, b]) => a + b)  
);  
combined.subscribe(value => console.log(value));

In the code above, we got the sum of the values. Then we get:

7  
8  
9

These are the sum of each entry that we have before.

concat

We can use the concat operator to take multiple Observables and return a new Observable that sequentially emits values from each Observable that were passed in.

It works by subscribing to them one at a time and merging the results in the output Observable. We can pass in an array of Observables or put them directly as arguments. Passing in an empty array will result in an Observable that completes immediately.

concat doesn’t affect Observables in any way. When an Observable completes, it’ll subscribe to the next one an emit its values. This will be repeated until the operator runs out of Observables.

merge operator would output values from Observables concurrently.

If some input Observable never completes, concat will also never complete and Observables follows them will never be subscribed. If some Observable completes without emitting any values, then it’ll be invisible to concat .

If any Observable in the chain emit errors, then the error will error immediately. Observable that would be subscribed after the one that errors will never be subscribed to.

We can pass in the same Observable subscribe to the same one repeatedly.

For example, we can use it as follows:

import { concat, of } from "rxjs";
const observable1 = of(1, 2, 3);  
const observable2 = of(4, 5, 6);  
const concatted = concat(observable1, observable2);  
concatted.subscribe(value => console.log(value));

Then we get:

1  
2  
3  
4  
5  
6

as we expect.

forkJoin

forkJoin accepts an array of Observables and emits an array of values in the exact same order as the passed array or a dictionary of values in the same shape as the passed dictionary.

The returned Observable will emit the last values emitted of each Observable. For example, we can write:

import { forkJoin, of } from "rxjs";
const observable1 = of(1, 2, 3);  
const observable2 = of(4, 5, 6);  
const joined = forkJoin(observable1, observable2);  
joined.subscribe(value => console.log(value));

Then we get [3, 6] .

We can also pass in an object with Observables as properties:

import { forkJoin, of } from "rxjs";
const observable1 = of(1, 2, 3);  
const observable2 = of(4, 5, 6);  
const joined = forkJoin({ observable1, observable2 });  
joined.subscribe(value => console.log(value));

Then we get:

{observable1: 3, observable2: 6}

Conclusion

The combineLatest, concat, and forkJoin operators are very useful for combining emitted data from multiple Observables.

With combineLatest, we can combine emitted data from multiple Observables and get arrays of values that are formed by the latest values emitted by each Observable that we passed in.

The concat operator subscribes to each Observable that we passed in sequentially and return an Observable that emits values from each sequentially. If an error occurs in any Observable, an error will be emitted by the returned Observable.

Finally, the forkJoin operator returns an Observable that get the latest values from each Observable and emits the value as an object or an array depending if you passed in a dictionary of Observables or an array of Observables.

Leave a Reply

Your email address will not be published. Required fields are marked *