Categories
JavaScript Rxjs

Rxjs Operators — More Join Operators

Rxjs is a library for doing reactive programming. Creation operators are useful for generating data from various data sources to be subscribed to by Observers.

In this article, we’ll look at some join operators, including exhaust , mergeAll , startsWith and withLatestFrom .

exhaust

The exhaust operator converts higher-order Observables into first-order Observables by dropping inner Observables while previous inner Observable hasn’t completed.

It takes no parameters and returns an Observable that takes the source Observables and propagates values from the first Observable exclusively until it completes before subscribing to the next one.

For example, we can use it as follows:

import { of, interval } from "rxjs";  
import { map, exhaust, take } from "rxjs/operators";

const of$ = of(1, 2, 3);  
const higherOrderObservable = of$.pipe(map(val => interval(500).pipe(take(3))));  
const result = higherOrderObservable.pipe(exhaust());  
result.subscribe(x => console.log(x));

The code above maps the values from the of$ Observable into interval(500).pipe(take(3)) Observables, which emit numbers up to 2 every half a second.

Then we pipe the interval(500).pipe(take(3)) Observables to the exhaust operator. Then all Observables other than the first interval(500).pipe(take(3)) are dropped since it has finished emitting while the next ones are to be executed.

Then we get:

0  
1  
2

outputted from console.log .

mergeAll

mergeAll converts higher-order Observables to first-order Observables which concurrently delivers all values emitted by inner Observables.

It takes the optionalconcurrent argument, which defaults to Number.INFINITY for the maximum number of inner Observables being subscribed to concurrently.

mergeAll subscribes to all inner Observables within higher-order Observables and delivers all the values from them on the output Observable. The returned output Observable only completes when all inner Observables are completed.

Any error emitted by inner Observables will immediately result in errors emitted by the returned Observable.

For instance, we can use it as follows:

import { of } from "rxjs";  
import { map, mergeAll } from "rxjs/operators";

const of$ = of(1, 2, 3);  
const higherOrderObservable = of$.pipe(map(val => of("a", "b", "c")));  
const result = higherOrderObservable.pipe(mergeAll());  
result.subscribe(x => console.log(x));

The code above maps each emitted values from the of$ Observable to of(“a”, “b”, “c”) Observables.

Then the mergeAll operator subscribes to all the of(“a”, “b”, “c”) Observables and then subscribe to each of them and then emit the values of each.

Then we get:

a  
b  
c  
a  
b  
c  
a  
b  
c

from console.log .

We can also change the concurrency by passing in a number to the mergeAll operator.

For example, we can write:

import { of, interval } from "rxjs";  
import { map, mergeAll, take } from "rxjs/operators";

const of$ = of(1, 2, 3);  
const higherOrderObservable = of$.pipe(  
  map(val => interval(1000).pipe(take(2)))  
);  
const result = higherOrderObservable.pipe(mergeAll(1));  
result.subscribe(x => console.log(x));

to make mergeAll subscribe to each child Observable returned from the map operator’s callback sequentially, which will get us:

0  
1  
0  
1  
0  
1

from the console.log .

Or we can change 1 to 5 as follows:

import { of, interval } from "rxjs";  
import { map, mergeAll, take } from "rxjs/operators";

const of$ = of(1, 2, 3);  
const higherOrderObservable = of$.pipe(  
  map(val => interval(1000).pipe(take(2)))  
);  
const result = higherOrderObservable.pipe(mergeAll(5));  
result.subscribe(x => console.log(x));

Then we get:

(3) 0  
(3) 1

outputted from console.log as they’re subscribed to concurrently.

startWith

startsWith returns an Observable that emits the items we want to emit before it begins to emit items from the source Observable.

It takes one argument, which is an array of items that we want to be emitted before the values from the source Observable by the returned Observable.

For example, we can use it as follows:

import { of } from "rxjs";  
import { startWith } from "rxjs/operators";

of(1)  
  .pipe(startWith("foo", "bar"))  
  .subscribe(x => console.log(x));

Then we get:

foo  
bar  
1

as the console.log output.

withLatestFrom

The withLatestFrom operator combines the source Observable with other Observables to create an Observable which emits values that are calculated from the latest values of each, only when the source emits.

It takes a list of arguments, which is an Observable to combine the values with.

For example, we can use it as follows:

import { fromEvent, interval } from "rxjs";  
import { withLatestFrom } from "rxjs/operators";

const clicks = fromEvent(window, "click");  
const timer = interval(1000);  
const result = clicks.pipe(withLatestFrom(timer));  
result.subscribe(x => console.log(x));

We track the clicks of the browser tab with the fromEvent operator. Then we combine the results emitted from the timer Observable with it by using the withLatestFrom operator.

In the end, the result Observable will emit arrays that have the MouseEvent object from the clicks Observable as the first value and the number from the timer Observable as the second value.

The exhaust operator converts higher-order Observables into first-order Observables by dropping inner Observables while previous inner Observable hasn’t completed.

mergeAll converts higher-order Observables to first-order Observables which concurrently delivers all values emitted by inner Observables.

startsWith returns an Observable that emits the items we want to emit before the source Observable begins emitting values.

The withLatestFrom operator combines the source Observable with another Observable and returns an Observable that emits the latest values from both.

Categories
JavaScript Rxjs

Rxjs Operators — Conditional and Booleans

Rxjs is a library for doing reactive programming. Creation operators are useful for generating data from various data sources to be subscribed to by Observers.

In this article, we’ll look at the defaultIfEmpty , every , find , findIndex and isEmpty operators.

defaultIfEmpty

The defaultIfEmpty operator returns an Observable that emits a value if the source Observable completes without emitting anything. Otherwise, the values from the source Observable are emitted.

It takes one optional value which is the defaultValue . The default for this is null . It’s the value that’ll be emitted if the source Observable is empty.

For example, we can use it as follows:

import { of } from "rxjs";  
import { defaultIfEmpty } from "rxjs/operators";  
of()  
  .pipe(defaultIfEmpty(1))  
  .subscribe(x => console.log(x));

We have an empty of() Observable, which will emit nothing before completing, so we’ll see 1 emitted.

every

The every operator returns an Observable that emits whether or not every item of the source satisfies the condition specified.

It takes up to 2 arguments. The first is a predicate function to determine if the item emitted by the source Observable meets a specified condition.

The second argument is the thisArg , which is optional and defaults to undefined . We set this to set the value of this in the predicate function.

We can use it as follows:

import { of } from "rxjs";  
import { every } from "rxjs/operators";  
const of$ = of(1, 2, 3, 4, 5, 6);  
const result = of$.pipe(every(val => val % 2 === 0));  
result.subscribe(x => console.log(x));

The code above checks if every value of the of$ Observable is an even number since we passed in val => val % 2 === 0 as the predicate function.

The result Observable emits false since not every number is even in the of$ Observable.

find

The find operator returns the first value emitted by the source Observable that matches the condition.

It takes up to 2 arguments. The first is a predicate function that returns the condition to check for matching the emitted values from the source Observable.

The second argument is the thisArg , which is optional and defaults to undefined . We set this to set the value of this in the predicate function.

We can use it as follows:

import { of } from "rxjs";  
import { find } from "rxjs/operators";  
const of$ = of(1, 2, 3, 4, 5, 6);  
const result = of$.pipe(find(val => val % 2 === 0));  
result.subscribe(x => console.log(x));

We should get 2 since it’s the first even number emitted by of$ , since we specified that val => val % 2 === 0 is the predicate that we want to check.

findIndex

The findIndex operator returns an Observable that emits of the first match of the value emitted from the source Observable that matches the condition returned by the predicate function.

It takes up to 2 arguments. The first is a predicate function that returns the condition to check for matching the emitted values from the source Observable.

The second argument is the thisArg , which is optional and defaults to undefined . We set this to set the value of this in the predicate function.

We can use it as follows:

import { of } from "rxjs";  
import { findIndex } from "rxjs/operators";  
const of$ = of(1, 2, 3, 4, 5, 6);  
const result = of$.pipe(findIndex(val => val % 2 === 0));  
result.subscribe(x => console.log(x));

We should get 1 since it’s the first index with an even number emitted by of$ , since we specified that val => val % 2 === 0 is the predicate that we want to check.

isEmpty

The isEmpty operator returns an Observable that emits false if the source Observable emit any values. Otherwise, true is emitted by the returned Observable.

It takes no arguments.

For example, we can use it as follows:

import { of } from "rxjs";  
import { isEmpty } from "rxjs/operators";  
const of$ = of();  
const result = of$.pipe(isEmpty());  
result.subscribe(x => console.log(x));

Then we should get true since of$ emits nothing.

The defaultIfEmpty operator returns an Observable that emits a default value if the source Observable emits nothing or emits the values of the source Observable if it emits something.

The every operator returns an Observable that emits whether or not every item of the source satisfies the condition specified.

The find and findIndex operators both search for the first value emitted by the source Observable that meets a condition, but find emits the object that meets the condition and findIndex emits the index of the object.

The isEmpty operator returns an Observable that emits true if it emits nothing and false otherwise.

Categories
JavaScript Rxjs

Rxjs Operators — Calculation and Aggregation

Rxjs is a library for doing reactive programming. Creation operators are useful for generating data from various data sources to be subscribed to by Observers.

In this article, we’ll look at some calculation and aggregation operators including count , max , min and reduce .

count

The count operator returns an Observable that counts the number of emissions on the source and emits the number when the source completes.

It takes an optional predicate function which returns a boolean with the condition for counting.

The predicate has 3 parameters, which is the value emitted by the source Observable, the index , which is the zero-based ‘index’ of the value from the source Observable, and the source , which is the source Observable itself.

For example, we can use it as follows to count the total number of emitted values:

import { of } from "rxjs";  
import { count } from "rxjs/operators";
const of$ = of(1, 2, 3, 4, 5, 6);  
const result = of$.pipe(count());  
result.subscribe(x => console.log(x));

The count operator without a predicate function passed in will count all emissions from the source Observable.

This should get us 6 since we have 6 values in the of$ Observable.

We can also pass in a predicate function as follows:

import { of } from "rxjs";  
import { count } from "rxjs/operators";
const of$ = of(1, 2, 3, 4, 5, 6);  
const result = of$.pipe(count(val => val % 2 === 0));  
result.subscribe(x => console.log(x));

Then we should get 3 since we’re only the number of even numbers emitted from of$ .

max

The max operator takes value from a source Observable that emits the numbers or items that can be compared with a comparer function, and gets the largest one from it and emits it with the returned Observable.

It takes an optional comparer function, which we can use to compare the value of 2 items.

For example, we can use it to get the highest number emitted from an Observable as follows:

import { of } from "rxjs";  
import { max } from "rxjs/operators";
of(2, 3, 4, 100)  
  .pipe(max())  
  .subscribe(x => console.log(x));

We should get 100 since 100 is the biggest number in the of(2, 3, 4, 100) Observable.

Also, we can pass in a function to the max operator to get the largest value from a list of objects as follows:

import { of } from "rxjs";  
import { max } from "rxjs/operators";

const people = [  
  { age: 17, name: "Joe" },  
  { age: 25, name: "Jane" },  
  { age: 19, name: "Mary" }  
];

of(...people)  
  .pipe(max((a, b) => a.age - b.age))  
  .subscribe(x => console.log(x));

Then we should get:

{age: 25, name: "Jane"}

from the console.log .

The comparer function works like the callback of the array’s sort method. We keep the order if comparer returns a negative number. We switch the order if comparer returns a positive number. Otherwise, they’re the same. Then the largest one is picked from the end.

min

The min operator is the opposite of the max operator. It gets the smallest value from the source Observable.

The arguments are the same as the max operator.

We can use it like the max operator as follows:

import { of } from "rxjs";  
import { min } from "rxjs/operators";
of(2, 3, 4, 100)  
  .pipe(min())  
  .subscribe(x => console.log(x));

Then we get 2.

Like the max operator, we can use the min operator with a comparer function as follows:

import { of } from "rxjs";  
import { min } from "rxjs/operators";  
const people = [  
  { age: 17, name: "Joe" },  
  { age: 25, name: "Jane" },  
  { age: 19, name: "Mary" }  
];  
of(...people)  
  .pipe(min((a, b) => a.age - b.age))  
  .subscribe(x => console.log(x));

Then we get:

{age: 17, name: "Joe"}

The comparer function works the same way as the one we pass into the max operator, except that the first one is picked instead of the last.

reduce

The reduce operator applies an accumulator function over all the values emitted by the source Observable to combine them into a single value, which is emitted by the returned Observable.

It takes up to 2 arguments. The first is the accumulator function, which is the function we use to combine the values.

The second argument is an optional seed value, which is the initial value of the accumulation.

For example, we can use it as follows:

import { of } from "rxjs";  
import { reduce } from "rxjs/operators";  
of(2, 3, 4, 100)  
  .pipe(reduce((a, b) => a + b, 0))  
  .subscribe(x => console.log(x));

The code above will sum up of the values from the of(2, 3, 4, 100) Observable and emits 109, which are the sum of 2, 3, 4, and 100.

a and b are the values emitted from the source Observable.

The count operator returns an Observable to count the number of times a source Observable emits or source Observable emitting a certain kind of value depending on the condition we specify.

min and max are used to get the minimum and maximum value according to a sorting function or from a group of numbers emitted respectively.

The reduce operator returns an Observable that takes the values from the source Observable and combine them into one in a way that we specify in the function we pass into it.

Categories
JavaScript Rxjs

Rxjs Filtering Operators — Skip and Take

Rxjs is a library for doing reactive programming. Creation operators are useful for generating data from various data sources to be subscribed to by Observers.

In this article, we’ll look at more filtering operators, including skipUntil, skipWhile, take, takeLast, takeUntil, and takeWhile operators.

skipUntil

The skipUntil operator returns an Observable that skips items emitted by the source Observable until a second Observable emits an item.

It takes one argument, which is the notifer Observable. When this emits an item, then the source Observable’s emitted values will be emitted by the skipUntil operator.

For example, we can use it as follows:

import { interval } from "rxjs";  
import { skipUntil, take } from "rxjs/operators";

const numbers = interval(1000).pipe(  
  take(10),  
  skipUntil(interval(5000).pipe(take(1)))  
);  
numbers.subscribe(x => console.log(x));

The code above takes the emitted values from the interval(1000) Observabkem then pipe it to the take(10) operator to take the first 10 values, then take those values to the skipUntil operator, which has the interval(5000).pipe(take(1)) Observable passed in.

interval(5000).pipe(take(1)) emits a value after 5 seconds, which then will trigger the emission of the interval(1000) Observable’s emitted values.

Then we should see:

4  
5  
6  
7  
8  
9

skipWhile

The skipWhile operator returns an Observable that skips all items emitted by the source Observable until the condition returned by the predicate function becomes false.

It takes one argument, which is a predicate function to test each item with the condition returned by the function.

For example, we can write the following:

import { of } from "rxjs";  
import { skipWhile } from "rxjs/operators";

const numbers = of(1, 2, 3, 4, 5, 6).pipe(skipWhile(x => x < 3));  
numbers.subscribe(x => console.log(x));

Then the items from the of(1, 2, 3, 4, 5, 6) Observable won’t be emitted when the value is less than 3. This means that numbers will emit 3, 4, 5 and 6.

Once the condition in the predicate function is tested false once, it’ll keep emitting regardless whether the emitted value from the source Observable make the condition evaluate to false or not.

So if we have:

import { of } from "rxjs";  
import { skipWhile } from "rxjs/operators";

const numbers = of(1, 2, 3, 4, 3, 2).pipe(skipWhile(x => x < 3));  
numbers.subscribe(x => console.log(x));

Then we get 3, 4, 5 and 2 since the condition becomes false once the first 3 was emitted by of(1, 2, 3, 4, 3, 2).

take

The take operator returns an Observable that emits the first count values emitted by the source Observable.

It takes one argument which is the count, which the maximum number of values to emit.

For example, we can use it as follows:

import { of } from "rxjs";  
import { take } from "rxjs/operators";

const numbers = of(1, 2, 3, 4, 3, 2).pipe(take(2));  
numbers.subscribe(x => console.log(x));

Then we get 1 and 2 logged since we passed 2 into the take operator.

takeLast

The takeLast operator returns an Observable that emits the last count values emitted by the source Observable.

It takes one argument which is the count, which the maximum number of values to emit from the end of the sequence of values.

For instance, we can use it as follows:

import { of } from "rxjs";  
import { takeLast } from "rxjs/operators";

const numbers = of(1, 2, 3, 4, 5, 6).pipe(takeLast(2));  
numbers.subscribe(x => console.log(x));

We should get 5 and 6 since we passed in 2 to the takeLast operator, which means we only want the last 2 values from the of(1, 2, 3, 4, 5, 6) Observable emitted.

takeUntil

The takeUntil operator returns an Observable that emits value from the source Observable until the notifier Observable emits a value.

It takes one argument, which is the notifier whose emitted value will cause the return Observable to stop emitting values from the source Observable.

For example, we can use it as follows:

import { interval, timer } from "rxjs";  
import { takeUntil } from "rxjs/operators";

const source = interval(1000);  
const result = source.pipe(takeUntil(timer(5000)));  
result.subscribe(x => console.log(x));

The code above will emit the values from the source Observable until timer(5000) emits its first value.

So we should get something like:

0  
1  
2  
3

takeWhile

The takeWhile operator returns an Observable that emits the values emitted by the source Observable as long as the condition in the predicate function is satisfied and completes as soon as the condition is not satisfied.

It takes of argument, which is the predicate function that returns the condition to check each value from the source Observable with. The function takes the item from the source Observable as the first parameter and the index, starting from 0, of the value emitted as the second parameter.

For example, we can use it as follows:

import { interval } from "rxjs";  
import { takeWhile } from "rxjs/operators";

const source = interval(1000);  
const result = source.pipe(takeWhile(x => x <= 10));  
result.subscribe(x => console.log(x));

Then we should get:

0  
1  
2  
3  
4  
5  
6  
7  
8  
9  
10

since we specified in the predicate function that x => x <= 10, which means that any value less than or equal to 10 from the source Observable will be emitted.

The skipUntil operator returns an Observable that skips items emitted by the source Observable until a second Observable emits an item.

skipWhile operator returns an Observable that skips all items emitted by the source Observable until the condition passed into the predicate function becomes false

The take operator returns an Observable that emits the first number of values emitted by the source Observable.

The takeLast operator returns an Observable that emits the last number of values emitted by the source Observable.

takeUntil operator returns an Observable that emits value from the source Observable until the Observable passed into this operator emits a value.

Finally, the takeWhile operator returns an Observable that emits the values emitted by the source Observable as long as the condition passed into this is satisfied.

Categories
JavaScript Rxjs

Rxjs Filtering Operators — Sample, Skip and Single

RxJS is a library for doing reactive programming. Creating operators are useful for generating data from various data sources to be subscribed to by Observers.

In this article, we’ll look at some filtering operators, including sample, sampleTime,skip, single, and skipLast operators.

sample

The sample operator returns an Observable that emits the value from the source Observable when the notifier Observable emits.

It takes one argument, which is the notifer Observable.

For example, we can use it as follows:

import { interval } from "rxjs";  
import { sample } from "rxjs/operators";
const seconds = interval(1000);  
const result = seconds.pipe(sample(interval(5000)));  
result.subscribe(x => console.log(x));

In the code above, we have the seconds Observable which emits a number every second. Then we pipe the result of it to the sample operator, where we set the notifier Observable which is interval(5000), which emits every 5 seconds.

Therefore, we have a new Observable that emits values from seconds every 5 seconds.

We should see every few values from seconds logged.

sampleTime

The sampleTime operator emits the most recently emitted value from the source Observable in periodic intervals.

It takes up to 2 arguments. The first is the period, which is the period to wait until emitting a value from the source Observable. It’s measured in milliseconds or the time unit of the optional scheduler .

The second argument is the optional scheduler, which defaults to async.

It returns a new Observable that emits value from the source Observable in a specified time interval.

For example, we can use it as follows:

import { interval } from "rxjs";  
import { sampleTime } from "rxjs/operators";
const seconds = interval(1000);  
const result = seconds.pipe(sampleTime(5000));  
result.subscribe(x => console.log(x));

The code above takes the values emitted from the seconds Observable and pipe it to the sampleTime operator, which will emit values from seconds every 5 seconds.

The result will be that we get values emitted from the result Observable, which gets its values from the seconds Observable, every 5 seconds.

We should see some numbers logged from the console.log .

single

single returns an Observable that emits the single item from the source Observable that matches a given condition returned by the predicate function.

If the source Observable emits more than one of such item or no items, then we get IllegalArgumentException or NoSuchElementException respectively.

If the source Observable emits items but none match the condition in the predicate function then undefined is emitted.

It takes one argument, which is the predicate function that returns the condition to match the item emitted from the source Observable.

For example, if we have:

import { of } from "rxjs";  
import { single } from "rxjs/operators";
const numbers = of(1).pipe(single());  
numbers.subscribe(x => console.log(x));

Then we get 1 logged.

If we have:

import { of } from "rxjs";  
import { single } from "rxjs/operators";
const numbers = of(1, 2).pipe(single());  
numbers.subscribe(x => console.log(x));

Then we get the error “Sequence contains more than one element” logged.

If we have:

import { of } from "rxjs";  
import { single } from "rxjs/operators";
const numbers = of(1, 2).pipe(single(x => x % 2 === 0));  
numbers.subscribe(x => console.log(x));

Then we get 2 logged since 2 matches the condition x => x % 2 === 0 .

skip

The skip operator returns an Observable that skips the first count items emitted by the source Observable and emit the rest.

It takes the required count argument, which is the number of items from the source Observable to skip.

For example, if we have:

import { of } from "rxjs";  
import { skip } from "rxjs/operators";

const numbers = of(1, 2, 3, 4, 5).pipe(skip(2));  
numbers.subscribe(x => console.log(x));

Then we get:

3  
4  
5

logged because we specified that we want to skip the first 2 emitted values from of(1, 2, 3, 4, 5) .

skipLast

The skipLast operator returns an Observable that skips the last count values emitted from the source Observable.

It takes the required count argument, which is the number of items from the source Observable to skip from the end of the source Observable.

For example, we can write:

import { of } from "rxjs";  
import { skipLast } from "rxjs/operators";

const numbers = of(1, 2, 3, 4, 5).pipe(skipLast(2));  
numbers.subscribe(x => console.log(x));

Then we get:

1  
2  
3

logged since we specified that we want to skip the last 2 emitted values.

The sample and sampleTime operators return an Observable that emits the value from the source Observable when the notifier Observable emits or at the specified time interval respectively.

single returns an Observable that emits the single item from the source Observable that matches a given condition.

The skip operator returns an Observable that skips the number of items at the start of emitting the values from the Observable.

Finally, skipLast operator returns an Observable that skips the last number of values emitted from the source Observable.