Categories
JavaScript Rxjs

Rxjs Operators — Utility Operators

Rxjs is a library for doing reactive programming. Creation operators are useful for generating data from various data sources to be subscribed to by Observers.

In this article, we’ll look at some utility operators that help us do various things, including the tap , delay , delayWhen , and dematerialize operators.

tap

The tap operator returns an Observable that lets us perform some side effects for every emission on the source Observable and return the emit the same values from the source.

It takes 3 optional arguments. The first is a nextOrObserver object which is a normal Observable or callback for next . The second is an error callback, which is a callback for errors in the source. The last is the complete callback, which is called when the source Observable completes.

It lets us intercept the emission from the source do something during that time by running a function. Then it returns the same output as the source Observable.

For example, we can use it as follows:

import { of } from "rxjs";  
import { tap } from "rxjs/operators";  
const of$ = of(1, 2, 3);  
const result = of$.pipe(  
  tap(  
    val => console.log(`value: ${val}`),  
    error => console.log(error),  
    () => console.log("complete")  
  )  
);  
result.subscribe(x => console.log(x));

Then we should get:

value: 1  
1  
value: 2  
2  
value: 3  
3  
complete

since we logged the values emitted with:

val => console.log(`value: ${val}`)

And we logged 'complete' when the of$ Observable completes by writing:

() => console.log("complete")

delay

The delay operator returns an Observable that delays the emission of items from the source Observable by the specified delay value or until a given Date.

It takes up to 2 arguments. The first is the delay , which is the delay duration in milliseconds or a Date until which the emission of the source item is delayed.

The second argument is an optional scheduler , which defaults to async . It lets us change the timing mechanism for emitting the values by the returned Observable.

For example, we can use it as follows:

import { of } from "rxjs";  
import { delay } from "rxjs/operators";  
const of$ = of(1, 2, 3);  
const result = of$.pipe(delay(1000));  
result.subscribe(x => console.log(x));

For example, we can use it as follows to delay the emission of values from the of$ Observable by a second with delay(1000) .

We should get 1, 2, and 3 emitted after the delay.

Also, we can delay emission to a specific date as follows:

import { of } from "rxjs";  
import { delay } from "rxjs/operators";  
import moment from "moment";  
const of$ = of(1, 2, 3);  
const result = of$.pipe(  
  delay(  
    moment()  
      .add(5, "seconds")  
      .toDate()  
  )  
);  
result.subscribe(x => console.log(x));

The code above will delay the emission of values from of$ by 5 seconds from the current date and time.

The output should be unchanged.

delayWhen

The delayWhen operator returns an Observable that delays the emission from the source Observable by a given time span determined by the emission of another Observable.

It takes up to 2 arguments. The first is the delayDurationSelector function that returns an Observable for each value emitted by the source Observable. When the Observable returned by this function emits, then the value from the source Observable will emit.

The second argument is an optional argument for the subscriptionDelay . It’s an Observable that triggers the subscription to the source Observable once it emits any value.

For instance, we can use it as follows:

import { of, interval } from "rxjs";  
import { delayWhen } from "rxjs/operators";  
const of$ = of(1, 2, 3);  
const result = of$.pipe(delayWhen(val => interval(Math.random() * 5000)));  
result.subscribe(x => console.log(x));

The code above will delay the emission each value emitted by the of$ Observable by Math.random() * 5000 milliseconds. This means that each value will be emitted when interval(Math.random() * 5000) emits.

The result will be that 1, 2 and 3 will be emitted in an random order.

dematerialize

The dematerialize returns an Observable object that turns an Observable of Notification objects into emissions that they represent.

It takes no parameters.

For example, we can use it as follows:

import { of, Notification } from "rxjs";  
import { dematerialize } from "rxjs/operators";

const notifA = new Notification("N", 1);  
const notifB = new Notification("N", 2);  
const notifE = new Notification("E", undefined, new Error("error"));  
const materialized = of(notifA, notifB, notifE);  
const upperCase = materialized.pipe(dematerialize());  
upperCase.subscribe(x => console.log(x), e => console.error(e));

Then we should get:

1  
2  
Error: error

Since we converted the Notification objects’ values into the values that are emitted by the return Observable.

The second argument passed into the Notification constructor is the values that are emitted for the normal values emitted. The error is the third argument of the Notification constructor.

tap returns an Observable that lets us perform some side effects for every emission on the source Observable and then emit the same values from the source.

delay returns an Observable that delays the emission of items from the source Observable by the specified delay value or until a given Date.

delayWhen returns an Observable that delays the emission from the source Observable by a given time span which is determined by the emission of another Observable.

dematerialize returns an Observable object that emits the values set in the Notification objects emitted by the source Observable.

Categories
JavaScript Nodejs

Node.js FS Module — Creating Directories

Manipulating files and directories are basic operations for any program. Since Node.js is a server-side platform and can interact with the computer that it’s running on directly, being able to manipulate files is a core feature.

Fortunately, Node.js has the fs module built into its library. It has many functions that can help with manipulating files and folders. File and directory operations that are supported include basic ones like manipulating and opening files in directories.

Likewise, it can do the same for files. It can do this both synchronously and asynchronously. It has an asynchronous API that has functions that support promises.

Also, it can show statistics for a file. Almost all the file operations that we can think of can be done with the built-in fs module. In this article, we will create directories with the mkdir and mkdtemp family of functions to create normal directories and temporary directories respectively.

Creating Permanent Directories with fs.mkdir Family of Functions

To create permanent directories, we can use the mkdir function to create them asynchronously. It takes 3 arguments. The first argument is the path object, which can be a string, a Buffer object or an URL object.

The second argument is an object with various properties that we can set as options.

The recursive property is a boolean property that lets us create all the levels of the directory if not all the levels exist. The default value of the recursive property is false .

The mode property is an octal number property that we set the directory’s permission and sticky bit for POSIX systems. This option isn’t supported on Windows.

The default value for mode is 0o777 , which is readable, writable and executable for everyone.

The mode integer can also replace the options object as the second argument. The third argument is a callback function which is called when the directory creation operation is complete.

The function takes an err parameter which is null when the operation succeeds and has an object with the error information otherwise.

Calling mkdir when the path is a directory that exists results in an error only when the recursive option is set to false .

We can use the mkdir function like in the following code:

const fs = require("fs");  
const dirToCreate = "./files/createdFolder/createdFolder";

fs.mkdir(  
  dirToCreate,  
  {  
    recursive: true,  
    mode: 0o77  
  },  
  err => {  
    if (err) {  
      throw err;  
    }  
    console.log("Directory created!");  
  }  
);

If we run the code above, we can see that it created all the parent directories for the lowest level directories since we set recursive to true .

The synchronous version of the mkdir function is the mkdirSync function. It takes 2 arguments. The first argument is the path object, which can be a string, a Buffer object or an URL object.

The second argument is an object with various properties that we can set as options. The recursive property is a boolean property that lets us create all the levels of the directory if not all the levels exist. The default value of the recursive property is false .

The mode property is an octal number property that we set the directory’s permission and sticky bit for POSIX systems.

This option isn’t supported on Windows. The default value for mode is 0o777 , which is readable, writable and executable for everyone. The mode integer can also replace the options object as the second argument. The function returns undefined .

Calling mkdir when the path is a directory that exists results in an error only when the recursive option is set to false .

We can use the mkdirSync function like in the following code:

const fs = require("fs");  
const dirToCreate = "./files/createdFolder/createdFolder";

fs.mkdirSync(dirToCreate, {  
  recursive: true,  
  mode: 0o77  
});  
console.log("Directory created!");

If we run the code above, we can see that it created all the parent directories for the lowest level directories since we set recursive to true .

There’s also a promise version of the mkdir function. . It takes 2 arguments. The first argument is the path object, which can be a string, a Buffer object or an URL object.

The second argument is an object with various properties that we can set as options. The recursive property is a boolean property that let us create all the levels of the directory if not all the levels exist. The default value of the recursive property is false .

The mode property is an octal number property that we set the directory’s permission and sticky bit for POSIX systems. This option isn’t supported on Windows.

The default value for mode is 0o777 , which is readable, writable and executable for everyone. The mode integer can also replace the options object as the second argument.

The function returns undefined . Calling mkdir when the path is a directory that exists results in a promise reject only when the recursive option is set to false .

The function returns a promise that resolves with no argument when the directory creation operation succeeds.

We can use the promise version of the mkdir function like in the following code:

const fsPromises = require("fs").promises;  
const dirToCreate = "./files/createdFolder/createdFolder";

(async () => {  
  await fsPromises.mkdir(dirToCreate, {  
    recursive: true,  
    mode: 0o77  
  });  
  console.log("Directory created!");  
})();

If we run the code above, we can see that it created all the parent directories for the lowest level directories since we set recursive to true . This is a better option than using mkdirSync for creating directories sequentially since it doesn’t tie up the whole program like the synchronous version does when the directory creation operation is being run.

Creating Temporary Directories with fs.mkdtemp Family of Functions

The Node.js fs module has special functions for creating temporary directories. The mkdtemp family of functions allows us to do this with one function call.

The mkdtemp function takes 3 arguments. The first argument is the prefix for the temporary folder which is a string. It’s the path of the folder that you want to create, which will have some characters appended to it. The second argument is an options string or object.

The options object consists of the encoding property which is the character encoding for the folder name, which defaults to utf8 .

The encoding value string can replace the object as the second argument. The second argument is optional.

The third argument is a callback function that is called when the temporary directory creation function ends.

The callback function has 2 parameters. The first one is the err object which is null when the operation succeeds. The second parameter is the folder path which is a string.

The folder name for the generated folder will have the prefix with 6 random characters appended behind the prefix. In some systems like BSD systems, it can return more than 6 random characters after the prefix.

If we want to create a temporary directory within /tmp , then the prefix must end with a platform-specific path separator, which we can get from require(‘path’).sep .

We can create a temporary directory with the following code:

const fs = require("fs");  
const prefix = "./files/tempDir";

fs.mkdtemp(  
  prefix,  
  {  
    encoding: "utf8"  
  },  
  (err, folder) => {  
    if (err) {  
      throw err;  
    }  
    console.log("Temp directory created!", folder);  
  }  
);

If we run the code, we get the folder path logged in the console.log and we should see a new temporary folder created in the given path.

The mkdtemp function has a synchronous counterpart called the mkdtempSync function. The mkdtemp function takes 2 arguments. The first argument is the prefix for the temporary folder which is a string.

It’s the path of the folder that you want to create, which will have some characters appended to it.

The second argument is an optional string or object. The options object consists of the encoding property which is the character encoding for the folder name, which defaults to utf8 .

The encoding value string can replace the object as the second argument. The second argument is optional. It returns the created folder path.

If we want to create a temporary directory within /tmp , then the prefix must end with a platform-specific path separator, which we can get from require(‘path’).sep .

We can use the mkdtempSync function like in the following code:

const fs = require("fs");  
const prefix = "./files/tempDir";  
try {  
  const folder = fs.mkdtempSync(prefix, {  
    encoding: "utf8"  
  });  
  console.log("Temp directory created!", folder);  
} catch (error) {  
  console.error(error);  
}

If we run the code, we get the folder path logged in the console.log and we should see a new temporary folder created in the given path.

There’s also a promise version of the mkdtemp function. The mkdtemp function takes 2 arguments. The first argument is the prefix for the temporary folder which is a string.

It’s the path of the folder that you want to create, which will have some characters appended to it. The second argument is an optional string or object.

The optional object consists of the encoding property which is the character encoding for the folder name, which defaults to utf8 .

The encoding value string can replace the object as the second argument. The second argument is optional.

It returns a Promise that resolves with the path of the created directory when the temporary directory creation operation succeeds.

If we want to create a temporary directory within /tmp , then the prefix must end with a platform-specific path separator, which we can get from require(‘path’).sep .

We can use the promise version of the mkdtemp function like in the following code:

const fsPromises = require("fs").promises;  
const prefix = "./files/tempDir";

(async () => {  
  try {  
    const folder = await fsPromises.mkdtemp(prefix, {  
      encoding: "utf8"  
    });  
    console.log("Temp directory created!", folder);  
  } catch (error) {  
    console.error(error);  
  }  
})();

If we run the code, we get the folder path logged in the console.log and we should see a new temporary folder created in the given path. This is a better option than using mkdtempSync for creating directories sequentially since it doesn’t tie up the whole program like the synchronous version does when the directory creation operation is being run.

We created permanent directories with the mkdir family of functions to create normal directories. The mkdir family of functions can create all the parent directories along with the actual directory that we can create.

Also, we created temporary directories with the mkdtemp family of functions.

There’re the regular callback-based asynchronous function, the new promised basic asynchronous function, and the synchronous version of each function.

This promise versions of the functions are better creating directories sequentially since it doesn’t tie up the whole program like the synchronous version does when the directory creation operation is being run.

Categories
JavaScript

Messing with JavaScript This with Call, Bind and Apply

In JavaScript, the this object can mean different things in different contexts. It mostly depends on the location where the this keyword is located. If it’s in an object, then this is the object. If this is in a function, then this is the function. For classes, this would be the class since it’s just syntactic sugar for normal constructor functions. In JavaScript functions, there’s a call, bind, and apply methods to change the value of this inside a function. In this article, we’ll look at them closely and how to use them to change the value of this. These methods only apply to regular functions and not arrow functions.

call

The call method lets us call a function with the given this object that we pass into the argument when we call it and arguments for the function provided individually.

It takes multiple arguments. The first argument is an object that we want to set to be the value of this inside the function that we’re calling the call method on. It’s an optional argument. We can set it to an object, null or undefined . In non-strict mode, null and undefined will be replaced with the global object and primitive values will be converted to objects. The second and subsequent arguments are arguments that we pass into the function.

The return value of the call method is the same function it’s called on with the specified this value and arguments.

The purpose of the call method is that we can reuse the same function with a different this value without writing a new value with the this value that we want.

For example, we can change the value of this in a function with the call method by writing the following code:

let obj = {  
  firstName: 'Joe',  
  lastName: 'Smith'  
}

let person  = {  
  firstName: 'Jane',  
  lastName: 'Smith',  
  getFullName()  {  
    return `${this.firstName} ${this.lastName}`  
  }  
}

console.log(person.getFullName());  
console.log(person.getFullName.call(obj));

In the code above, when we call person.getFullName() we get ‘Jane Smith’ since we have firstName set to 'Jane' in the person object and lastName set to 'Smith' in the same object. Since this is the person object inside the getFullName function, we get back ‘Jane Smith’ as a result.

When we use the call method to change the this object inside the getFullName function to obj by calling person.getFullName.call(obj)) , we get ‘Joe Smith’, since we changed this to obj which has firstName set to 'Joe' and lastName set to ‘Smith’.

We can also use the call method for constructor functions. For example, if we have:

const Person = function(firstName, lastName) {  
  this.firstName = firstName;  
  this.lastName = lastName;  
}const Employee = function(firstName, lastName, employeeCode) {  
  this.employeeCode = employeeCode;  
  Person.call(this, firstName, lastName)  
}

const employee = new Employee('Jane', 'Smith', '123');  
console.log(`${employee.firstName} ${employee.lastName}`, employee.employeeCode);

Then in the Employee constructor, we call the Person constructor function that we previously defined. When we create a new Employee object, we pass in a new firstName and lastName , and then inside the Employee constructor, we used the call method of the Person function to pass the firstName and lastName to the Person constructor, thereby setting the value for the firstName and lastName properties of this without setting them in the Employee constructor directly.

When we run console.log on the last line, we get 'Jane Smith' and '123'.

Another example is that we can use call with anonymous functions. For example, we can write the following function to log a greeting:

(function(greeting) {  
  console.log(`${greeting} ${this.firstName} ${this.lastName}`);  
}.call({  
  firstName: 'Joe',  
  lastName: 'Smith'  
}, 'Hello'));

In the function call above, we pass in an object with the firstName and lastName properties to the first argument of the call method of the anonymous function. Then we pass in the string 'Hello' in the second argument, which is passed into the first parameter of the anonymous function. This means that 'Hello' is the value of the greeting parameter, and this.firstName is 'Joe' and this.lastName is 'Smith' .

So, we should get 'Hello Joe Smith’ when we run the code above.

apply

The apply method is almost the same as the call method, except that we pass in an array for the second argument as the arguments for the function call instead of a comma-separated list of objects.

This means we can replace call with apply in the examples above by making some small changes.

The first example would be the same except that we change call to apply since we didn’t pass in the second or subsequent arguments:

let obj = {  
  firstName: 'Joe',  
  lastName: 'Smith'  
}

let person  = {  
  firstName: 'Jane',  
  lastName: 'Smith',  
  getFullName()  {  
    return `${this.firstName} ${this.lastName}`  
  }  
}

console.log(person.getFullName());  
console.log(person.getFullName.apply(obj));

If we chain constructors with the apply method for constructor functions, then we have to change call to apply and change the arguments list into an array. For example, if we have:

const Person = function(firstName, lastName) {  
  this.firstName = firstName;  
  this.lastName = lastName;  
}
const Employee = function(firstName, lastName, employeeCode) {  
  this.employeeCode = employeeCode;  
  Person.call(this, firstName, lastName)  
}

const employee = new Employee('Jane', 'Smith', '123');  
console.log(`${employee.firstName} ${employee.lastName}`, employee.employeeCode);

like we did above, then we just have to change it to:

const Person = function(firstName, lastName) {  
  this.firstName = firstName;  
  this.lastName = lastName;  
}

const Employee = function(firstName, lastName, employeeCode) {  
  this.employeeCode = employeeCode;  
  Person.apply(this, [firstName, lastName])  
}

const employee = new Employee('Jane', 'Smith', '123');  
console.log(`${employee.firstName} ${employee.lastName}`, employee.employeeCode);

Then we get the same output as we did above.

Finally, for the last example, we can change the code from:

(function(greeting) {  
  console.log(`${greeting} ${this.firstName} ${this.lastName}`);  
}.call({  
  firstName: 'Joe',  
  lastName: 'Smith'  
}, 'Hello'));

to:

(function(greeting) {  
  console.log(`${greeting} ${this.firstName} ${this.lastName}`);  
}.apply({  
  firstName: 'Joe',  
  lastName: 'Smith'  
}, ['Hello']));

Then we get the same output as we did before the change.

bind

The bind method is only for changing the value of this in a function. Like call and apply , it can be used to send arguments into the function.

It takes multiple arguments. The first argument is an object that we want to set to be the value of this inside the function that we’re calling the bind method on. This argument is ignored if the bound function is constructed with the new operator. When we use the bind method to create a function that’s supplied as a callback inside the setTimeout function, then any primitive value passed into the first argument is converted to an object. If not arguments are provided to bind , the this value of the executing scope is treated as the value for the new function.

The second and subsequent arguments are arguments to prepend to the list of arguments when we call the function with bind .

The return value of the bind method is the same function it’s called on with the specified this value and arguments.

For example, if we call bind with no argument like in the following code:

let person = {  
  firstName: 'Jane',  
  lastName: 'Smith',  
  getName(){  
    return `${this.firstName} ${this.lastName}`  
  }  
}console.log(person.getName.bind()());

Then we get undefined undefined because this would be set to the window object since the this of the executing scope is a window object since it’s run at the top level.

If we call bind with an object passed in, like in the following code:

let person = {  
  firstName: 'Jane',  
  lastName: 'Smith',  
  getName() {  
    return `${this.firstName} ${this.lastName}`  
  }  
}

const joe = {  
  firstName: 'Joe',  
  lastName: 'Smith'  
}

console.log(person.getName.bind(joe)());

We get 'Joe Smith' since we passed in the joe object into the first argument of the bind method, which sets the this object inside the getName method to the joe object.

Also, we can get pass arguments into a function with bind. For example, we can add a greet method that takes a greeting and an age parameter and return a new string with all of them combined together with the existing fields as we have in the following code:

let person = {  
  firstName: 'Jane',  
  lastName: 'Smith',  
  getName() {  
    return `${this.firstName} ${this.lastName}`  
  },  
  greet(greeting, age) {  
    return `${greeting} ${this.firstName} ${this.lastName}. You're ${age} years old`  
  }  
}

const joe = {  
  firstName: 'Joe',  
  lastName: 'Smith'  
}

console.log(person.greet.bind(joe, 'Hello', 20)());

If we run the code above, then we get 'Hello Joe Smith. You’re 20 years old’ since we set this to joe by passing joe into the first argument of the bind method. Then we pass in our arguments to the person.greet method by passing in 'Hello' , and 20 as the second and third arguments of the bind method, which gets passed into the first and second argument of the greet method call.

We can also use it with the setTimeout function. According to the definition of bind that we have above, if we pass in a primitive value to the first argument of bind of the callback function for the setTimeout function, then it’ll be converted to an object. For example, if we have:

setTimeout(function(x, y) {  
  console.log(this, x, y);  
}.bind(1, 1, 1), 1)

Then we get Number {1} 1 1 from the console.log output in the callback function since we have the primitive value one in the first argument converted to an object, then the other 2 arguments are the arguments that are passed into the callback function when it’s called.

In JavaScript functions, there’s a call , bind , and apply methods to change the value of this inside a function. In this article, we’ll look at them closely and how to use them to change the value of this . They only apply to regular functions since arrow functions don’t change the value of this . All 3 are very similar in that they all change the value of this and let us pass in arguments to the function that they’re called on.

Categories
Express JavaScript Nodejs

List of Express body-parser Errors

By default, Express 4.x or later doesn’t come with anything to parse request bodies. Therefore, we need to add a library to handle this.

body-parser is a middleware that we can use to parse various kinds of request bodies. It throws several kinds of errors that we need to manage. In this article, we’ll look at what errors it throws.

Errors

When body-parser throws an error, it sends a response code and an error message. The message property of the error has the error message.

The following errors are common errors that are thrown.

Content-Encoding Unsupported

This error is thrown when the request has a Content-Encoding header that contains an encoding but the inflation option was set to false.

The status of the response will be 415m and type property is set to encoding.unsupported. The charset property will be set to the encoding that’s unsupported.

For example, we can reproduce it if we have:

const express = require('express');  
const bodyParser = require('body-parser');  
const app = express();  
const options = {  
  inflate: false,  
};
app.use(bodyParser.urlencoded(options));
app.post('/', (req, res) => {  
  res.send(req.body);  
});
app.listen(3000);

Then when we send a POST request to / with Content-Encoding set to abc and Content-Type set to application/x-www-form-urlencoded, we get:

content encoding unsupported

and a 415 status code.

Request Aborted

This error occurs when the request is aborted by the client before the reading of the body is finished.

The received property will be set to the number of bytes received before the request was aborted.

The status of the response will be 400 and type property set to request.aborted.

Request Entity Too Large

The error will occur when the request body is larger than what’s specified in the limit option.

A 413 error response will be sent with the type property set to entity.too.large.

For example, if we set limit to 0 as follows:

const express = require('express');  
const bodyParser = require('body-parser');  
const app = express();  
const options = {  
  inflate: false,  
  limit: 0  
};
app.use(bodyParser.json(options));
app.post('/', (req, res) => {  
  res.send(req.body);  
});
app.use((err, req, res, next) => {  
  res.status(400).send(err);  
});
app.listen(3000);

Then when we make a POST request to / with a JSON body that has content, then we get:

{"message":"request entity too large","expected":20,"length":20,"limit":0,"type":"entity.too.large"}

Request Size Didn’t Match Content-Length

The request’s length didn’t match the length from the Content-Length header.

This occurs when the request is malformed. Content-Length usually is calculated based on characters instead of bytes.

The status returned will be 400 and the type property is set to request.size.invalid.

For example, if we have:

const express = require('express');  
const bodyParser = require('body-parser');  
const app = express();  
const options = {  
  inflate: false,    
};
app.use(bodyParser.urlencoded(options));
app.post('/', (req, res) => {  
  res.send(req.body);  
});
app.listen(3000);

and we send a POST request to / with the Content-Length header set to -1 and Content-Type set to application/x-www-form-urlencoded, we get a 400 response.

Stream Encoding Shouldn’t Be Set

req.setEncoding method is called prior to body-parser methods will cause this error.

We can’t call req.setEncoding directly when using body-parser.

The status will be 500 and the type property set to stream.encoding.set.

Too Many Parameters

The error occurs when the number of parameters in the request body exceeds what’s set in the parameterLimit.

The response status will be 413 and the type property set to parameters.too.many

Unsupported Charset “BOGUS”

This will occur when the request has a Content-Encoding header that contains an unsupported encoding. The encoding is contained in the message as well as in the encoding property.

The status will be set to 415. The type will be set to encoding.unsupported, and the encoding property is set to the encoding that’s unsupported.

Conclusion

There’re multiple kinds of errors raised by body-parser.

They involve sending bad headers or data that are not accepted by it, or canceling requests before all the data is read.

Various 400 series error status codes will be sent as the response along with the corresponding error messages and stack trace.

Categories
JavaScript Rxjs

Rxjs Operators — Throttle and Join

Rxjs is a library for doing reactive programming. Creation operators are useful for generating data from various data sources to be subscribed to by Observers.

In this article, we’ll look at the throttle and join operators, including the throttle , throttleTime , combineLatest and concatAll .

Filtering Operators

Throttle

The throttle operator emits values from the source Observable, then ignores subsequently emitted values from the source for a duration determined by another Observable, then repeats the process.

It takes up to 2 arguments. The first is the durationSelector , which is a function that takes a value from the source Observable then returns an Observable or Promise that computes the throttling duration for each source value.

The second argument is an optional config object to define the leading and trailing behavior. Default value is defaultThrottleConfig .

It returns an Observable that performs the throttle operation from the source.

For example, we can use it as follows:

import { interval } from "rxjs";  
import { throttle } from "rxjs/operators";

const interval$ = interval(1000);  
const result = interval$.pipe(throttle(ev => interval(5000)));  
result.subscribe(x => console.log(x));

The code above has an interval$ Observable that emits a number every second. The values from it are pipe d into the throttle operator, which takes a function that returns the interval(5000) Observable, which emits a number every 5 seconds.

Therefore, the throttle operator will emit values from interval$ every 5 seconds since our throttle callback function returned interval(5000) .

Then we should get every 5th number logged in the console.log .

throttleTime

throttleTime emits a value from the source Observable then ignores subsequently emitted values for duration milliseconds then repeats the process.

It takes up to 3 arguments. The first is the duration , which is the time to wait before emitting another value after emitting the last value. It’s measured in milliseconds or the time unit of the optional scheduler .

The second argument is the optionalscheduler , which defaults to async . It’s used for setting the timing of the emission.

The last argument is the config , which is optional. It defaults to defaultThrottleConfig . We can pass in an object to define the leading and trailing behavior. Default value is{ leading: true, trailing: false }.

For example, we can use it as follows:

import { interval } from "rxjs";  
import { throttleTime } from "rxjs/operators";

const interval$ = interval(1000);  
const result = interval$.pipe(throttleTime(5000));  
result.subscribe(x => console.log(x));

The above works like our previous throttle example, except that we change throttle(ev => interval(5000)) to throttleTime(5000) , which do the same thing.

We emit numbers from $interval every 5 seconds.

Then we get the same numbers logged as the example above.

Join Operators

combineAll

The combineAll operator flattens Observable of Observables by applying combineLatest when they complete.

It takes one optional argument, which is a project function to map each value emitted to something we want.

Once the outer Observable completes then it subscribes to all collected Observables and combines their values as follows:

  • Every time an inner Observable emits, the output Observable emits
  • When the returned Observable emits and a project function is specified then project is called as the values arrive and it’ll manipulate each value with the project function
  • If there’s no project function then the most recent values is emitted by the returned Observable

For example, we can use it as follows:

import { of } from "rxjs";  
import { map, combineAll } from "rxjs/operators";

const of$ = of(1, 2, 3);  
const higherOrderObservable = of$.pipe(map(val => of("a", "b", "c")));  
const result = higherOrderObservable.pipe(combineAll());  
result.subscribe(x => console.log(x));

The code above maps the of$ Observable to child Observables. It returns of(“a”, “b”, “c”) for each value emitted by of$ .

Then we use combineAll to combine the latest values from each of the child Observables in higherOrderObservable into an array. Then each of these arrays is emitted by the result Observable.

Then we get:

["c", "c", "a"]  
["c", "c", "b"]  
["c", "c", "c"]

as the console.log output. The first 2 Observables completed before the last one, so we get 'c' from them. Then the as the third one emits, the last value in the array is added, so we get 'a' , 'b' and 'c' respectively from them.

concatAll

The concatAll operator converts higher-order Observables by concatenating the inner Observables in order,

It takes no parameters and returns an Observable that emits the values from all inner Observables concatenated.

For example, we can use it as follows:

import { of } from "rxjs";  
import { map, concatAll } from "rxjs/operators";

const of$ = of(1, 2, 3);  
const higherOrderObservable = of$.pipe(map(val => of("a", "b", "c")));  
const result = higherOrderObservable.pipe(concatAll());  
result.subscribe(x => console.log(x));

The of$ Observable’s emitted values is pipe d and each value from of$ is mapped to the of(“a”, “b”, “c”) Observable.

Then we pipe the results from the inner Observables in higherOrderObservable which are 3 of(“a”, “b”, “c”) Observables mapped from of$ have their emitted values combined by concatAll .

In the end, we get:

a  
b  
c  
a  
b  
c  
a  
b  
c

The throttle operator emits values from the source Observable, then ignores subsequently emitted values from the source for a duration determined by another Observable and repeats.

throttleTime emits a value from the source Observable then ignores subsequently emitted values for a given amount of time and repeats.

combineAll flattens Observable of Observables by combining the latest values when they complete.

concatAll operator combines the emitted values of inner Observables in order.