Categories
JavaScript

What are Rxjs Schedulers?

Spread the love

Schedulers are entities that control when a subscription starts and when notifications are delivered. It has 3 components:

  • data structure — It knows how to store and queue tasks based on priority or other criteria.
  • execution context — denotes where and when tasks are executed. It can be immediate or deferred until later.
  • virtual clock — tasks are run relative to the time denoted by the now() getter method on the scheduler.

Defining a Scheduler

We can define a basic scheduler as follows:

import { of, asyncScheduler } from "rxjs";  
import { observeOn } from "rxjs/operators";
of(1, 2, 3)  
  .pipe(observeOn(asyncScheduler))  
  .subscribe(val => console.log(val));

The code turns our synchronous Observable into an asynchronous Observable by using the asyncScheduler .

We’ll notice the difference if we compare the output of the synchronous example below:

import { of, asyncScheduler } from "rxjs";  
import { observeOn } from "rxjs/operators";
const observable = of(1, 2, 3);
console.log("before sync subscribe");  
observable.subscribe({  
  next(x) {  
    console.log(`got sync value ${x}`);  
  },  
  error(err) {  
    console.error(`something wrong occurred: ${err}`);  
  },  
  complete() {  
    console.log("done");  
  }  
});  
console.log("after sync subscribe");

And the asynchronous example:

import { of, asyncScheduler } from "rxjs";  
import { observeOn } from "rxjs/operators";
const observable = of(1, 2, 3);
console.log("before async subscribe");  
observable.pipe(observeOn(asyncScheduler)).subscribe({  
  next(x) {  
    console.log(`got async value ${x}`);  
  },  
  error(err) {  
    console.error(`something wrong occurred: ${err}`);  
  },  
  complete() {  
    console.log("done");  
  }  
});  
console.log("after async subscribe");

We see that the synchronous example outputs:

before sync subscribe  
got sync value 1  
got sync value 2  
got sync value 3  
done  
after sync subscribe

The asynchronous example with the asyncScheduler gets us the following output:

before async subscribe  
after async subscribe  
got async value 1  
got async value 2  
got async value 3  
done

As we can see, the asyncScheduler defers the execution of the Observable until after the synchronous code is run.

The async Scheduler operates with a setTimeout or setInterval. Even if the delay is zero it still runs on setTimeout or setInterval. It’ll run in the next event loop iteration.

The asyncScheduler has a schedule() method that takes a delay argument, which refers to the quantity of time relative to the Scheduler’s own internal clock. The internal clock doesn’t have anything to do with the actual clock time. Temporal operations are dictated by the Scheduler’s clock, not the real clock.

This is useful for testing since we can easily fake the time for testing while the tasks are run asynchronously.

Scheduler Types

In addition to the asyncScheduler , there’re other types of Schedulers available in RxJS:

  • null — notifications are delivered synchronously and recursively. Useful for constant time or tail-recursive operations
  • queueScheduler — schedules on a queue in the current event frame. Useful for iteration.
  • asapScheduler — schedules on the microtask queue, which is the same one used for promises
  • asyncScheduler — used for time-based operations. Works are scheduled with setInterval
  • animationFrameScheduler — Schedule tasks to run just before the next browser content repaint. It can be used to smooth browser animations

Using Schedulers

The following functions take a scheduler argument:

  • bindCallback
  • bindNodeCallback
  • combineLatest
  • concat
  • empty
  • from
  • fromPromise
  • interval
  • merge
  • of
  • range
  • throw
  • timer

We’re already using Schedulers without specifying it explicitly. RxJS will pick a default scheduler by finding one that introduces the least amount of concurrency and does the job.

For example, for timed operations, async is used, for simpler ones that are emit a finite amount of messages, null or undefined are used.

Some operators also take a Scheduler argument. They include time related ones like bufferTime, debounceTime, delay, auditTime, sampleTime, throttleTime, timeInterval, timeout, timeoutWith, windowTime .

Other instance operators like cache, combineLatest, concat, expand, merge, publishReplay, startWith also take a Scheduler argument.

With Schedulers, we can control when data are emitted by Observables. There’re different kinds of Schedulers which RxJS will pick automatically based on the principle of creating the least concurrency to do the job.

Schedulers run tasks according to their own clock. It has nothing to do with the real world’s clock.

Many operators allow us to set a Scheduler to meet our needs.

By John Au-Yeung

Web developer specializing in React, Vue, and front end development.

Leave a Reply

Your email address will not be published. Required fields are marked *