Rxjs is a library for doing reactive programming. Creation operators are useful for generating data from various data sources to be subscribed to by Observers.

In this article, we’ll look at the throttle and join operators, including the `throttle`

, `throttleTime`

, `combineLatest`

and `concatAll`

.

# Filtering Operators

## Throttle

The `throttle`

operator emits values from the source Observable, then ignores subsequently emitted values from the source for a duration determined by another Observable, then repeats the process.

It takes up to 2 arguments. The first is the `durationSelector`

, which is a function that takes a value from the source Observable then returns an Observable or Promise that computes the throttling duration for each source value.

The second argument is an optional `config`

object to define the `leading`

and `trailing`

behavior. Default value is `defaultThrottleConfig`

.

It returns an Observable that performs the throttle operation from the source.

For example, we can use it as follows:

```
import { interval } from "rxjs";
import { throttle } from "rxjs/operators";
const interval$ = interval(1000);
const result = interval$.pipe(throttle(ev => interval(5000)));
result.subscribe(x => console.log(x));
```

The code above has an `interval$`

Observable that emits a number every second. The values from it are `pipe`

d into the `throttle`

operator, which takes a function that returns the `interval(5000)`

Observable, which emits a number every 5 seconds.

Therefore, the `throttle`

operator will emit values from `interval$`

every 5 seconds since our `throttle`

callback function returned `interval(5000)`

.

Then we should get every 5th number logged in the `console.log`

.

# throttleTime

`throttleTime`

emits a value from the source Observable then ignores subsequently emitted values for `duration`

milliseconds then repeats the process.

It takes up to 3 arguments. The first is the `duration`

, which is the time to wait before emitting another value after emitting the last value. It’s measured in milliseconds or the time unit of the optional `scheduler`

.

The second argument is the optional`scheduler`

, which defaults to `async`

. It’s used for setting the timing of the emission.

The last argument is the `config`

, which is optional. It defaults to `defaultThrottleConfig`

. We can pass in an object to define the `leading`

and `trailing`

behavior. Default value is`{ leading: true, trailing: false }`

.

For example, we can use it as follows:

```
import { interval } from "rxjs";
import { throttleTime } from "rxjs/operators";
const interval$ = interval(1000);
const result = interval$.pipe(throttleTime(5000));
result.subscribe(x => console.log(x));
```

The above works like our previous `throttle`

example, except that we change `throttle(ev => interval(5000))`

to `throttleTime(5000)`

, which do the same thing.

We emit numbers from `$interval`

every 5 seconds.

Then we get the same numbers logged as the example above.

# Join Operators

## combineAll

The `combineAll`

operator flattens Observable of Observables by applying `combineLatest`

when they complete.

It takes one optional argument, which is a `project`

function to map each value emitted to something we want.

Once the outer Observable completes then it subscribes to all collected Observables and combines their values as follows:

- Every time an inner Observable emits, the output Observable emits
- When the returned Observable emits and a
`project`

function is specified then`project`

is called as the values arrive and it’ll manipulate each value with the`project`

function - If there’s no
`project`

function then the most recent values is emitted by the returned Observable

For example, we can use it as follows:

```
import { of } from "rxjs";
import { map, combineAll } from "rxjs/operators";
const of$ = of(1, 2, 3);
const higherOrderObservable = of$.pipe(map(val => of("a", "b", "c")));
const result = higherOrderObservable.pipe(combineAll());
result.subscribe(x => console.log(x));
```

The code above maps the `of$`

Observable to child Observables. It returns `of(“a”, “b”, “c”)`

for each value emitted by `of$`

.

Then we use `combineAll`

to combine the latest values from each of the child Observables in `higherOrderObservable`

into an array. Then each of these arrays is emitted by the `result`

Observable.

Then we get:

```
["c", "c", "a"]
["c", "c", "b"]
["c", "c", "c"]
```

as the `console.log`

output. The first 2 Observables completed before the last one, so we get `'c'`

from them. Then the as the third one emits, the last value in the array is added, so we get `'a'`

, `'b'`

and `'c'`

respectively from them.

## concatAll

The `concatAll`

operator converts higher-order Observables by concatenating the inner Observables in order,

It takes no parameters and returns an Observable that emits the values from all inner Observables concatenated.

For example, we can use it as follows:

```
import { of } from "rxjs";
import { map, concatAll } from "rxjs/operators";
const of$ = of(1, 2, 3);
const higherOrderObservable = of$.pipe(map(val => of("a", "b", "c")));
const result = higherOrderObservable.pipe(concatAll());
result.subscribe(x => console.log(x));
```

The `of$`

Observable’s emitted values is `pipe`

d and each value from `of$`

is mapped to the `of(“a”, “b”, “c”)`

Observable.

Then we `pipe`

the results from the inner Observables in `higherOrderObservable`

which are 3 `of(“a”, “b”, “c”)`

Observables mapped from `of$`

have their emitted values combined by `concatAll`

.

In the end, we get:

```
a
b
c
a
b
c
a
b
c
```

The `throttle`

operator emits values from the source Observable, then ignores subsequently emitted values from the source for a duration determined by another Observable and repeats.

`throttleTime`

emits a value from the source Observable then ignores subsequently emitted values for a given amount of time and repeats.

`combineAll`

flattens Observable of Observables by combining the latest values when they complete.

`concatAll`

operator combines the emitted values of inner Observables in order.