Categories
JavaScript

What are Rxjs Schedulers?

Schedulers are entities that control when a subscription starts and when notifications are delivered. It has 3 components:

  • data structure — It knows how to store and queue tasks based on priority or other criteria.
  • execution context — denotes where and when tasks are executed. It can be immediate or deferred until later.
  • virtual clock — tasks are run relative to the time denoted by the now() getter method on the scheduler.

Defining a Scheduler

We can define a basic scheduler as follows:

import { of, asyncScheduler } from "rxjs";  
import { observeOn } from "rxjs/operators";
of(1, 2, 3)  
  .pipe(observeOn(asyncScheduler))  
  .subscribe(val => console.log(val));

The code turns our synchronous Observable into an asynchronous Observable by using the asyncScheduler .

We’ll notice the difference if we compare the output of the synchronous example below:

import { of, asyncScheduler } from "rxjs";  
import { observeOn } from "rxjs/operators";
const observable = of(1, 2, 3);
console.log("before sync subscribe");  
observable.subscribe({  
  next(x) {  
    console.log(`got sync value ${x}`);  
  },  
  error(err) {  
    console.error(`something wrong occurred: ${err}`);  
  },  
  complete() {  
    console.log("done");  
  }  
});  
console.log("after sync subscribe");

And the asynchronous example:

import { of, asyncScheduler } from "rxjs";  
import { observeOn } from "rxjs/operators";
const observable = of(1, 2, 3);
console.log("before async subscribe");  
observable.pipe(observeOn(asyncScheduler)).subscribe({  
  next(x) {  
    console.log(`got async value ${x}`);  
  },  
  error(err) {  
    console.error(`something wrong occurred: ${err}`);  
  },  
  complete() {  
    console.log("done");  
  }  
});  
console.log("after async subscribe");

We see that the synchronous example outputs:

before sync subscribe  
got sync value 1  
got sync value 2  
got sync value 3  
done  
after sync subscribe

The asynchronous example with the asyncScheduler gets us the following output:

before async subscribe  
after async subscribe  
got async value 1  
got async value 2  
got async value 3  
done

As we can see, the asyncScheduler defers the execution of the Observable until after the synchronous code is run.

The async Scheduler operates with a setTimeout or setInterval. Even if the delay is zero it still runs on setTimeout or setInterval. It’ll run in the next event loop iteration.

The asyncScheduler has a schedule() method that takes a delay argument, which refers to the quantity of time relative to the Scheduler’s own internal clock. The internal clock doesn’t have anything to do with the actual clock time. Temporal operations are dictated by the Scheduler’s clock, not the real clock.

This is useful for testing since we can easily fake the time for testing while the tasks are run asynchronously.

Scheduler Types

In addition to the asyncScheduler , there’re other types of Schedulers available in RxJS:

  • null — notifications are delivered synchronously and recursively. Useful for constant time or tail-recursive operations
  • queueScheduler — schedules on a queue in the current event frame. Useful for iteration.
  • asapScheduler — schedules on the microtask queue, which is the same one used for promises
  • asyncScheduler — used for time-based operations. Works are scheduled with setInterval
  • animationFrameScheduler — Schedule tasks to run just before the next browser content repaint. It can be used to smooth browser animations

Using Schedulers

The following functions take a scheduler argument:

  • bindCallback
  • bindNodeCallback
  • combineLatest
  • concat
  • empty
  • from
  • fromPromise
  • interval
  • merge
  • of
  • range
  • throw
  • timer

We’re already using Schedulers without specifying it explicitly. RxJS will pick a default scheduler by finding one that introduces the least amount of concurrency and does the job.

For example, for timed operations, async is used, for simpler ones that are emit a finite amount of messages, null or undefined are used.

Some operators also take a Scheduler argument. They include time related ones like bufferTime, debounceTime, delay, auditTime, sampleTime, throttleTime, timeInterval, timeout, timeoutWith, windowTime .

Other instance operators like cache, combineLatest, concat, expand, merge, publishReplay, startWith also take a Scheduler argument.

With Schedulers, we can control when data are emitted by Observables. There’re different kinds of Schedulers which RxJS will pick automatically based on the principle of creating the least concurrency to do the job.

Schedulers run tasks according to their own clock. It has nothing to do with the real world’s clock.

Many operators allow us to set a Scheduler to meet our needs.

Categories
JavaScript JavaScript Basics

Where’s the Sleep Function in JavaScript?

If we try to search for the sleep function in JavaScript, we won’t find it. However, we can easily make one with existing functions.

setTimeout Function

There’s a setTimeout function that lets us run code after a specified number of milliseconds. We can use it to create our own sleep function.

Like most time-sensitive JavaScript code, it’s going to be asynchronous since this won’t hold up the main thread running our program.

We can create a function that runs code after a specified amount of time by writing:

const runLater = (delay) => {  
  setTimeout(() => {  
    console.log('bar');  
  }, delay)  
}

The runLater function will show the string 'bar' when we pass in the delay by in number of milliseconds.

If we run:

console.log('foo')  
runLater(100);  
console.log('baz')

We’ll see that we get:

foo  
baz  
bar

This is because the synchronous code, which are the first and last lines, are run first. The runLater(100) is queued up to be run in the next iteration of the event loop between the time when the first and last lines are run.

Then in the next iteration of the event loop, the code in the setTimeout callback runs, which will log 'baz' .

This means that if we want to run code in callbacks sequentially, we have to nest the callbacks. And too many callbacks create callback hell.

Promises

Therefore, the best way to run asynchronous JavaScript code sequentially is to use promises. We can create a promise with the setTimeout function by using the Promise constructor.

We can write the sleep function as follows:

const sleep = (delay) => {  
  return new Promise(resolve => {  
    setTimeout(resolve, delay)  
  });  
}

The sleep function returns a promise which resolves after the setTimeout callback, which is the resolve function is called after delay milliseconds.

A promise is asynchronous like the setTimeout callback. The good thing about it is that we can run them sequentially as they’re fulfilled.

Fulfilled means that the resolve method in the callback above is called. A promise can also be rejected with the reject function when an error occurs.

Using promises let us chain asynchronous code and run them sequentially.

We can do this cleanly with async functions. To define an async function, we use the async and await keywords as follows:

(async ()=>{  
  console.log('foo');  
  await sleep(2000);  
  console.log('bar');  
})();

If we run the code above, we should see that 'foo' is logged, then 2 seconds after that 'bar' is logged.

This will let us up time delayed code without hanging our app.

The number 2000 in the argument is in milliseconds, so it’ll wait 2 seconds before running the next line.

await tells the browser to hold off on executing the next line until the await line is resolved. We can put await in front of any promise to indicate that we should wait for the promise to resolve before continuing.

We can keep repeating this:

(async () => {  
  console.log('foo');  
  await sleep(2000);  
  console.log('bar');  
  await sleep(2000);  
  console.log('a');  
  await sleep(2000);  
  console.log('b');  
})();

But it gets repetitive. Fortunately, we can use loops to eliminate this repetition.

For-Await-Of Loop

We have the for-await-of loop which loops through iterable objects like arrays and array-like objects whether they’re synchronous or asynchronous.

To do this, we can write:

(async () => {  
  const arr = ['foo', 'bar', 'a', 'b'];  
  for await (let a of arr) {  
    console.log(a);  
    await sleep(2000);  
  }  
})();

The code above is much cleaner and it doesn’t matter how many things we want to loop through.

It’s important that we actually have promise code inside our async functions whether we want to use loops or not.

We can also return a promise from an async function, so we can write:

(async () => {  
  const arr = ['foo', 'bar', 'a', 'b'];  
  for await (let a of arr) {  
    console.log(a);  
    await sleep(2000);  
  }  
  return sleep(2000);  
})();

to return the sleep(2000) promise.

Also, we can give it a name and use it in other async functions as follows:

const foo = async () => {  
  const arr = ['foo', 'bar', 'a', 'b'];  
  for await (let a of arr) {  
    console.log(a);  
    await sleep(2000);  
  }  
  return sleep(2000);  
};

(async () => {  
  await foo();  
  console.log('c');  
})();

This shows that async functions are promises.

Conclusion

In JavaScript, there’s no sleep function like in other languages. However, we can make our own easily.

With promises, we can solve timing issues easily. It makes time-delayed code execution clean and readable. We can write time-delayed JavaScript code by using the setTimeout function.

The async and await syntax makes reading and writing the code a breeze.

Finally, we can call async functions in other async functions since these types of functions return promises only.