目录
一、Is what 是什么
二、Concept 概念
三、Basic realization 基本实现
四、Scheduler 线程控制(上)
五、Scheduler 线程控制(下)
六、变换
因个人学习需要,故文章内容均为网上摘抄整理,感谢创作者的辛勤,源文章地址请看文末。
API(二)
通过subscribeOn()和observeOn()控制线程,让事件的产生和消费发生在不同的线程。同样,在使用变换的方法(map()、flatMap())时,可以多次切换线程。
线程多次切换
observeOn() 指定的是Subscriber的线程,而这个Subscriber并不是(严格说应为『不一定是』)subscriber()参数中的Subscriber,而是observeOn()执行时的当前Observable所对应的Subscriber,即它的直接下级Subscriber。换句话说,observeOn()指定的是它之后的操作所在的线程。因此如果有多次切换线程的需求,只要在每个切换线程的位置调用一次observeOn()即可,示例如下:
Observable.just(1, 2, 3, 4) // IO 线程,由 subscribeOn() 指定
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.newThread())
.map(mapOperator) // 新线程,由 observeOn() 指定
.observeOn(Schedulers.io())
.map(mapOperator2) // IO 线程,由 observeOn() 指定
.observeOn(AndroidSchedulers.mainThread)
.subscribe(subscriber); // Android 主线程,由 observeOn() 指定
不同于
observeOn(),subscribeOn()的位置放在哪里都可以,只被调用一次。
含多个subscribeOn()时,只有第一个会起作用。
Scheduler 的原理(二)
subscribeOn()和observeOn()的内部实现,是用lift()。
subscribeOn()示意图:

observeOn()示意图:

相同点:subscribeOn()和observeOn()都做了线程切换的操作(图中"schedule..."部分)。
不同点:
-
subscribeOn()的线程切换发生在OnSubscribe中,即在它通知上一级OnSubscribe时,此时事件还没有开始发送,因此subscribeOn()的线程控制可以从事件发出的开端就造成影响; -
observaOn()的线程切换发生在它内建的Subscriber中,即发生在它即将给下级Subscriber发送事件时,因此observeOn()控制的是它后面的线程。
下面用一张图来解释当多个 subscribeOn() 和 observeOn() 混合使用时,线程调度是怎么发生的:

图中含 5 处对事件的操作:
①和②两处受第一个 subscribeOn() 影响,运行在红色线程;
③和④处受第一个 observeOn() 影响,运行在绿色线程;
⑤处受第二个 observeOn() 影响,运行在紫色线程;
第二个 subscribeOn() ,由于在通知过程中线程就被第一个 subscribeOn() 截断,因此对整个流程并没有任何影响。
doOnSubscribe()
虽然超过一个的subscribeOn()对事件处理的流程没有影响,但在流程之前却可以利用。
因为Subscriber的onStart()可以用作流程开始前的初始化,而onStart()在subscribe()发生时就被调用了,因此不能指定线程,只能执行在subscribe()被调用时的线程。试想,如果onStart()中含有对线程有要求的代码(例:在界面显示ProgressBar,必须在主线程执行),将会有线程非法的风险,因为有时无法预测subscribe()将会在什么线程执行。
与Subscriber.onStart()相对应的方法Observable.doOnSubscribe()。和Subscriber.onStart()同样是在sbscribe()调用后且事件发送前执行,区别在于可以指定线程。
- 默认情况:
doOnSubscribe()执行在subscribe()发生的线程; -
doOnSubscribe()后有subscribeOn()时,将执行在离他最近的subscribeOn()所指定的线程。
示例:
Observable.create(onSubscribe)
.subscribeOn(Schedulers.io())
.doOnSubscribe(new Action0() {
@Override
public void call() {
progressBar.setVisibility(View.VISIBLE); // 需要在主线程执行
}
})
.subscribeOn(AndroidSchedulers.mainThread()) // 指定主线程
.observeOn(AndroidSchedulers.mainThread())
.subscribe(subscriber);