Schedulers are entities that control when a subscription starts and when notifications are delivered. It has 3 components:
- data structure — It knows how to store and queue tasks based on priority or other criteria.
- execution context — denotes where and when tasks are executed. It can be immediate or deferred until later.
- virtual clock — tasks are run relative to the time denoted by the
now()
getter method on the scheduler.
Defining a Scheduler
We can define a basic scheduler as follows:
import { of, asyncScheduler } from "rxjs";
import { observeOn } from "rxjs/operators";
of(1, 2, 3)
.pipe(observeOn(asyncScheduler))
.subscribe(val => console.log(val));
The code turns our synchronous Observable into an asynchronous Observable by using the asyncScheduler
.
We’ll notice the difference if we compare the output of the synchronous example below:
import { of, asyncScheduler } from "rxjs";
import { observeOn } from "rxjs/operators";
const observable = of(1, 2, 3);
console.log("before sync subscribe");
observable.subscribe({
next(x) {
console.log(`got sync value ${x}`);
},
error(err) {
console.error(`something wrong occurred: ${err}`);
},
complete() {
console.log("done");
}
});
console.log("after sync subscribe");
And the asynchronous example:
import { of, asyncScheduler } from "rxjs";
import { observeOn } from "rxjs/operators";
const observable = of(1, 2, 3);
console.log("before async subscribe");
observable.pipe(observeOn(asyncScheduler)).subscribe({
next(x) {
console.log(`got async value ${x}`);
},
error(err) {
console.error(`something wrong occurred: ${err}`);
},
complete() {
console.log("done");
}
});
console.log("after async subscribe");
We see that the synchronous example outputs:
before sync subscribe
got sync value 1
got sync value 2
got sync value 3
done
after sync subscribe
The asynchronous example with the asyncScheduler
gets us the following output:
before async subscribe
after async subscribe
got async value 1
got async value 2
got async value 3
done
As we can see, the asyncScheduler
defers the execution of the Observable until after the synchronous code is run.
The async
Scheduler operates with a setTimeout
or setInterval
. Even if the delay is zero it still runs on setTimeout
or setInterval
. It’ll run in the next event loop iteration.
The asyncScheduler
has a schedule()
method that takes a delay argument, which refers to the quantity of time relative to the Scheduler’s own internal clock. The internal clock doesn’t have anything to do with the actual clock time. Temporal operations are dictated by the Scheduler’s clock, not the real clock.
This is useful for testing since we can easily fake the time for testing while the tasks are run asynchronously.
Scheduler Types
In addition to the asyncScheduler
, there’re other types of Schedulers available in RxJS:
null
— notifications are delivered synchronously and recursively. Useful for constant time or tail-recursive operationsqueueScheduler
— schedules on a queue in the current event frame. Useful for iteration.asapScheduler
— schedules on the microtask queue, which is the same one used for promisesasyncScheduler
— used for time-based operations. Works are scheduled withsetInterval
animationFrameScheduler
— Schedule tasks to run just before the next browser content repaint. It can be used to smooth browser animations
Using Schedulers
The following functions take a scheduler argument:
bindCallback
bindNodeCallback
combineLatest
concat
empty
from
fromPromise
interval
merge
of
range
throw
timer
We’re already using Schedulers without specifying it explicitly. RxJS will pick a default scheduler by finding one that introduces the least amount of concurrency and does the job.
For example, for timed operations, async
is used, for simpler ones that are emit a finite amount of messages, null
or undefined
are used.
Some operators also take a Scheduler argument. They include time related ones like bufferTime
, debounceTime
, delay
, auditTime
, sampleTime
, throttleTime
, timeInterval
, timeout
, timeoutWith
, windowTime
.
Other instance operators like cache
, combineLatest
, concat
, expand
, merge
, publishReplay
, startWith
also take a Scheduler argument.
With Schedulers, we can control when data are emitted by Observables. There’re different kinds of Schedulers which RxJS will pick automatically based on the principle of creating the least concurrency to do the job.
Schedulers run tasks according to their own clock. It has nothing to do with the real world’s clock.
Many operators allow us to set a Scheduler to meet our needs.