前两篇文章介绍了RxJava的Single和Subject,想必大家对RxJava已经有简单的认识,并且可以写出异步任处理的具体实现了,本文会介绍Scheduler调度器的使用。
之前文章中我们有提到过subscribeOn和observeOn两个方法,他们都接收一个Scheduler参数做为传入值。主要作用就是指定异步任务Obserable和订阅者Subscribe两者处理的线程。
下表展示RxJava中常见的调度器种类:
调度器类型 | 效果 |
---|---|
Schedulers.computation() | 用于计算任务,如事件循环或和回调处理,不要用于IO操作(IO操作请使用Schedulers.io());默认线程数等于处理器的数量 |
Schedulers.from(executor) | 使用指定的Executor作为调度器 |
Schedulers.immediate( ) | 在当前线程立即开始执行任务 |
Schedulers.io( ) | 用于IO密集型任务,如异步阻塞IO操作,这个调度器的线程池会根据需要增长;对于普通的计算任务,请使用Schedulers.computation();Schedulers.io( )默认是一个CachedThreadScheduler,很像一个有线程缓存的新线程调度器 |
Schedulers.newThread( ) | 为每个任务创建一个新线程 |
Schedulers.trampoline( ) | 当其它排队的任务完成后,在当前线程排队开始执行 |
除了之前我们介绍过的subscribeOn/observeOn会用到Scheduler。在RxJava中还有Worker会用到它。
比如传统的创建线程:
new Thread(new Runnable() {
@Override
public void run() {
// TODO
}
}).start();
在RxJava中,我们可以这样实现:
Schedulers.newThread().createWorker().schedule(new Action0() {
@Override
public void call() {
// TODO
}
});
这样的好处是我们可以直接使用RxJava的线程调度逻辑。并且在合适的时机终止线程
Scheduler.Worker worker = Schedulers.io().createWorker();
worker.schedule(new Action0() {
@Override
public void call() {
// TODO
}
});
worker.unsubscribe();
除此之外,还有个好处是更容易的实现延迟执行和周期执行:
// 延迟
Schedulers.computation().createWorker().schedule(new Action0() {
@Override
public void call() {
// TODO
}}, 500, TimeUnit.MILLISECONDS);
// 周期
Schedulers.computation().createWorker().schedulePeriodically(new Action0() {
@Override
public void call() {
// TODO
}}, 500, 250, TimeUnit.MILLISECONDS);
总结
使用Scheduler可以管理订阅者和被订阅者的处理线程,并且通过Scheduler.Worker直接实现线程调度和延迟周期逻辑。
下篇介绍Observables。