Rxjs调度器
本文采用 RxjsV5.5
版本,这个版本和先前版本最大不同之处是import
方式,以及引入了 pipe
操作符来替代链式操作,在此不赘述。
今天主要是来将调度器scheduler
,这个通常用的比较少,但是了解它还是有作用的。
顾名思义,调度器可以理解为调度Observable,Observer,根据 官方scheduler文档 可以知道它有以下几个作用:
- 调度器是一种数据结构。它可以依据优先级和其它一些配置知道如何来存储队列任务(
queue tasks
) - 调度器可以充当执行环境。这表示任务什么时候什么地方执行,是立即执行,还是在回调函数中执行(使用
setTimeout
或者setInterval
, 又或者animation frame
) - 调度器拥有一个虚拟时钟。它通过getter方法
now()
提供了time
的概念。任务会根据调度安排,在特定的时间执行。
Marco && microtask 宏任务和微任务的概念
像 setTimeout | setInterval
都归属于宏任务,而 promises
属于微任务。
宏任务:
setTimeout
setInterval
setImmediate
I/O
UI rendering
微任务:
process.nextTick
Promise
MutationObserver
const log = console.log;
const macro = v => setTimeout(() => log(v));
const micro = v => Promise.resolve().then(() => log(v));
log(1);
macro(2);
micro(3);
log(4);
// 执行结果
1 4 3 2
可以看出宏任务和微任务都属于异步操作,但是Promise会优于setTimeout先执行。这是因为,在每一次事件循环中,宏任务只会提取一个执行,而微任务会一直提取,直到微任务队列为空为止。换句话说就是 事件队列完成之后,会执行微任务,最后才执行宏任务。
更多关于微任务和宏任务的区别:
setTimeout(() => log(1));
setTimeout(() => log(2), 0);
log(3);
// 因为第一个setTimeout没有设置delay,它将先于第二个setTimeout进入事件队列
// 运行结果
3
1
2
可能会好奇为什么讲这?这是因为下面调度就会用到微任务和宏任务的概念
Rxjs操作符与对应的调度器
- 默认同步的操作符,比如:
of | from | range
,它们默认的调度器为queue
- 默认异步的操作符,比如:
timer | interval
,它们默认的调度器为async
, 内部使用setInterval
改变默认调度器
像一些操作符最后一个参数可以为一个Scheduler
,我们可以通过传参的形式来改变默认的调度器类型,比如下列操作符
bindCallback
bindNodeCallback
combineLatest
concat
empty
from
fromPromise
interval
merge
of
range
throw
timer
示例:改变 of
默认的调度方式
// RxJSV5.5+版本的引入方式
import { async } from 'rxjs/scheduler/async';
import { of } from 'rxjs/observable/of';
const log = console.log;
of(1, async).subscribe(val => log(val));
log(2);
// 输出结果
2
1
// 如果使用of默认的调度 queue
of(1).subscribe(val => log(val));
log(2);
// 输出结果
1
2
subscribeOn && observeOn
先说 subscribeOn
,它的作用
- 改变源(
source observables
)的执行时机 - 只能用一次
示例
// 同步版本
const log = console.log;
let a$ = Rx.Observable.create(observer => {
setTimeout(() => observer.next(1)); // A
setTimeout(() => observer.next(2)); // B
setTimeout(() => observer.next(3)); // C
setTimeout(() => observer.complete()); // D
})
let subscription = a$.subscribe({
next: v => log(v), // E
complete: () => log('完成') // F
})
# 它的执行顺序为
A - E
B - E
C - E
D - F
// 异步实现版本
// 使用 'subscribeOn'
import { async } from 'rxjs/scheduler/async';
const log = console.log;
let a$ = Rx.Observable.create(observer => {
setTimeout(() => observer.next(1)); // A
setTimeout(() => observer.next(2)); // B
setTimeout(() => observer.next(3)); // C
setTimeout(() => observer.complete()); // D
})
let subscription = a$.subscribeOn(async) // 使用异步调度
.subscribe({
next: v => log(v), // E
complete: () => log('完成') // F
})
# 现在它的执行顺序为
A - B - C - D
E - E - E - F
另外示例:
import { async } from 'rxjs/scheduler/async';
import { of } from 'rxjs/observable/of';
const log = console.log;
of(1, async).subscribe(val => log(val));
log(2);
// 可以使用 subscribeOn 等同写为
of(1)
.subscribeOn(async)
.subscribe(val => log(val));
log(2);
observeOn
它的作用:
- 改变通知的
Notifications
执行时机,即Observabls中的Next, Error, Complete函数 - 能够用于每个操作符的前面,即可以多次使用
var observable = Rx.Observable.create(function (observer) {
observer.next(1);
observer.next(2);
observer.next(3);
observer.complete();
});
console.log('before subscribe');
observable.observeOn(Rx.Scheduler.async) // 设置为 async
.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
console.log('after subscribe');
// 执行结果
"before subscribe"
"after subscribe"
1
2
3
"complete"
调度 Schedulers
总共有4中调度:
类型 | 执行类型 | 内部调用 |
---|---|---|
queue | Sync同步的方式 | scheduler.schedule(task, delay) scheduler.flush() |
asap | Async(异步微任务) | Promise.resolve().then(() => task) |
async | Async(异步宏任务) | id = setInterval(task, delay) clearInterval(id) |
animationFrame | Async | id = requestAnimationFrame(task) cancelAnimationFrame(id) |
queue
特点:
- 同步执行
- 任务按顺序执行
- 当前任务结束后才执行下一个任务
- 性能优于事件队列
示例
import { queue } from 'rxjs/scheduler/queue';
const log = console.log;
queue.schedule(() => log(1));
log(2);
queue.schedule(() => log(3));
// 执行结果
1
2
3
// 注意这种情况 使用回调
queue.schedule(() => {
queue.schedule(() => log(1));
log(2);
queue.schedule(() => log(3));
});
// 执行结果
2
1
3
asap (as soon as possible)
特点:
- 异步执行(微任务)
- 任务在next tick之前执行,即比宏任务先执行
- 内部实现使用
promise
- 性能优于事件队列
示例
import { asap } from 'rxjs/scheduler/asap';
import { queue } from 'rxjs/scheduler/queue';
const log = console.log;
setTimeout(() => log(1)); // 异步 宏任务
asap.schedule(() => log(2)); // 异步 微任务
queue.schedule(() => log(3)); // 同步
// 执行结果
3
2
1
async
特点:
- 异步执行(宏任务)
- 内部实现使用
setInterval
- 使用事件队列,性能比上面的方式要差
示例
import { async } from 'rxjs/scheduler/async';
import { asap } from 'rxjs/scheduler/asap';
import { queue } from 'rxjs/scheduler/queue';
const log = console.log;
async.schedule(() => log(1)); // 异步 宏任务
asap.schedule(() => log(2)); // 异步 微任务
queue.schedule(() => log(3)); // 同步
// 执行结果
3
2
1
取消任务 cancelling tasks
使用 AsyncScheduler
和 AsyncAction
用来创建一个异步调用,async.schedule()
方法会返回一个 subcription
import { AsyncScheduler } from 'rxjs/scheduler/AsyncScheduler';
import { AsyncAction } from 'rxjs/scheduler/AsyncAction';
const log = console.log;
const s = new AsyncScheduler(AsyncAction); // 创建一个async示例
const DELAY = 0;
let subscription;
subscription = s.schedule((v) => log(v), DELAY, 1); // 异步调度A
s.schedule((v) => log(v), DELAY, 2); // 异步调度B
log(3);
subscription.unsubscribe(); // 取消异步调度A
// 结果 并没有A的值,因为它被取消了
3
2
内部时钟
now()
方法
上面谈调度器特性时,谈到了虚拟时钟
import { AsyncScheduler } from 'rxjs/scheduler/AsyncScheduler';
import { AsyncAction } from 'rxjs/scheduler/AsyncAction';
const log = console.log;
const s = new AsyncScheduler(AsyncAction); // 创建一个async示例
const DELAY = 2000;
const start = Date.now();
s.schedule((v) => log(v), DELAY, 1); // 异步调度A
s.schedule((v) => log(v), DELAY, 2); // 异步调度B
// s.now() 使用调度内部时钟
s.schedule(() => log(`${s.now() - start}ms`), DELAY)
log(3);
// 结果
3
2
1
2002ms // 这个时间会根据机器的执行速度而定
animationFrame 动画调度
特点:
- 异步执行
- 内部实现使用
requestAnimationFrame
- 适用于 DEVICE FRAME RATE
- 没有激活时很慢
- 平衡 CPU/GPU 负载
60FPS = 1000 / 60ms
使用 setInterval
的问题
- 忽略 DEVICE FRAME RATE
- 会一直运行,很耗电
- 不会考虑 CPU/GPU 负载
let token;
const paintFrame = () => {
// 动画
token = setInterval(paintFrame, 1000/60);
}
paintFrame();
setTimeout(() => clearInterval(token), 2000); // 2s后清除动画
使用 requestFrameAnimation
代替
let token;
const paintFrame = (timestamp) => {
// 动画
token = requestFrameAnimation(paintFrame);
}
paintFrame();
setTimeout(() => cancelFrameAnimation(token), 2000); // 2s后清除动画
使用动画调度示例
import { animationFrame } from 'rxjs/scheduler/animationFrame';
const DELAY = 0;
let state = { angle: 0 };
const div = document.querySelector('.circle');
let subscription;
// 放上去之后就停止动画
div.addEventListener('mouseover', () => {
if (!subscription) return;
subscription.unsubscribe();
});
// 离开之后开始动画
div.addEventListener('mouseout', () => {
subscription = animationFrame.schedule(work, DELAY);
});
// 动画函数
const work = () => {
let { angle } = state;
state = { angle: ++angle%360 };
div.style.transform = `rotate(${angle}deg)`;
subscription = animationFrame.schedule(work, DELAY);
}
subscription = animationFrame.schedule(work, DELAY);
VirtualTime Scheduler 虚拟时间调度
上面虽然列举了调度的4种类型,下面的虚拟时间其实也是一种调度
特点:
- 同步执行
- 通过delay延迟将所有的动作进行排队(Queues all actions sorting by delay)
- 需要手动执行 使用
flush()
示例
import { VirtualTimeScheduler, VirtualAction } from 'rxjs/scheduler/VirtualTimeScheduler';
const log = console.log;
const s = new VirtualTimeScheduler(VirtualAction); // 创建一个scheduler
const start = Date.now();
// tasks are sorted by delay
s.schedule(v => log(v), 2000, 2); // 2000ms的delay
s.schedule(v => log(v), 50, 2); // 50ms的delay
s.flush(); // 手动执行(同步的)
log(3);
log(`VirtualTimeScheduler: ${s.now()}ms`); // 虚拟时钟
log(`Execution: ${Date.now() - start}ms`); // 程序执行时间
// 结果
1
2
3
VirtualTimeScheduler: 2000ms
Execution: 6ms
示例2:
import { VirtualTimeScheduler, VirtualAction } from 'rxjs/scheduler/VirtualTimeScheduler';
import { interval } from 'rxjs/observable/interval';
const log = console.log;
const s = new VirtualTimeScheduler(VirtualAction); // 创建一个scheduler
const start = Date.now();
// 3600 * 1000表示 1小时
// take(24) 表示 24小时 即每个小时运行一次
interval(3600 * 1000, s).pipe(take(24))
.subscribe(v => log(v))
s.flush(); // 手动执行(同步的)
log(3);
log(`VirtualTimeScheduler: ${s.now()}ms`); // 虚拟时钟
log(`Execution: ${Date.now() - start}ms`); // 程序执行时间
// 结果
0
1
2
// ...
23
VirtualTimeScheduler: 86400000ms (1天)
Execution: 25ms
总结
调度器使用到的场景不多,但是了解它,对任务的调度可以跟细粒的进行控制。动画调度用的比较多一点,虚拟时钟调度对定时任务比较有用。