今天要给大家介绍一下RxJava里面线程切换的第一个操作符subscribeOn()操作符。
说到subscribeOn,可能很多同学还不太清楚实际使用它的时机。那么我们先来看看一个小例子。
大家可以先猜想一下打印的结果。再结合上一篇的map操作符源码分析的结论来看看,每次的log的顺序。
1、map和onNext方法分别发生在哪个线程?
2、log打印的顺序是怎么样?是每次把同一个数字的map里面的log和onNext先打印了再打印下一个?还是说先把所有的数字的map里面的log打印完毕之后再打印onNext?
让我们来看看实际的打印结果:
如果充分理解了上一篇map操作符的同学想必已经答对了。如果没有理解的同学建议好好看看上一篇,消化好了再来重头看看这篇。因为map是非常基础的操作符,理解好map对理解接下来的线程转换操作符非常有帮助。
没错,相信很多同学都已经知道了,subscribeOn操作符会影响在subscribeOn上面的代码的执行线程,比如上图里面,subscribeOn将发射数字和map的执行过程放在了io线程里面执行。(observeOn刚好相反,会将执行在observeOn下面的代码放入所指定的线程中执行)
ok,那么假如说咱们的需求改变了,发射的时候我们要运行在IO线程,但是map操作符的变换我们想在安卓的主线程执行!
嘿嘿,学以致用。咱们现在就动手,代码变成了下面的:
哈哈,欢天喜地,让我们来看看线程是不是真的如我们所愿改变好了:
不对啊???!!!
说好的subscribeOn会改变上面代码的执行线程嘛???怎么不对了
先放上结论:同一个Observable,无论你call多少次subscribeOn,只有第一个subscribeOn会起作用。
那究竟为什么呢?
咱们先上一个小小的“比喻”;
大家应该都可以理解,上图中的Log将会在线程3打印出来。纵使你在外面套再多层thread,最后Log只会在最里面的线程执行。RxJava里面subscribeOn就是这么个意思。只不过不是这么直白的用Thread来执行,而是把所有的代码包装进Runnable里面,然后把Runnable丢进一个个ExecutorService里面执行。
不了解ExecutorService的同学先看看这里:http://tutorials.jenkov.com/java-util-concurrent/executorservice.html
那么我们就来一探究竟吧!跟踪代码!
subscribeOn在执行只后会生成一个中间层Observable,ScalarSynchronousObservable,然后再执行lift(),然后就像上一篇文章里面讲的lift(),也会返回一个新的Observable。那么这一下我们就有了三个Observable了。注意这个中间层Observable也是持有源Observable的引用的。
如果还熟悉咱们的分析流程的话,就知道接下来我们应该从哪个Observable开始分析了。
大家应该还记得,Observable在转换的时候,最关键的就是那个lift()方法,生成新的subscriber传递给上一层的Observable,那么我们看看subscribeOn的操作符在Rxjava里面的对应类OperatorSubscribeOn(在图1的8234行出现),是怎么把原有的subscriber改造的
在RxJava中,每一个Scheduler都对应了一个ExecutorService,每一个ExecutorService都拥有自己的线程池,这个线程池所有的线程都有相同的前缀。点击这里查看怎样给ExecutorService分配线程池。
所以在上图中line 41就是通过Scheduler获取对应的ExecutorService。而这个操作符生成的新的subscriber是一个Observable类型的subscriber。每次在执行onNext的时候会把onNext中的参数的Observable发射到线程池中subscribe().(line 57, schedule()方法出卖了inner对象的本质,ExecutorService的生成过程大家可以自己跟踪代码查看)
大家可能看到这里就有点点晕了,这哪跟哪啊。。。为什么要把原有的subscriber包装给一个Observable类型的subscriber呢?
这时候我们别着急,先仔细想想一个最重要的,贯穿在整个RxJava所有操作符的一个问题:
在lift方法里面,我们新生成的subscriber会传给谁使用?
lift方法里面生成的新的subscriber会传给上一个Observable使用。那在subscribeOn的这个例子里面,上一层的Observable是谁?
是中间生成的那个Observable!它的执行过程是啥样的,咱们来瞅瞅
显然,这个中间层ScalarSynchronousObservable,它只做了一件事情,把它上一层的Observable传进OnSubscribe的call方法里面(还记得它的构造函数的调用嘛?参数就是上一层的Observable),交给下一层传上来的subscriber处理,执行其onNext方法。
call里面的subscriber,就是我们在图3里面生成的新的Subscriber。
再回想一下新生成subscriber的onNext做了啥?
这下就很明显了,这个中间层Observable,会在我们subscribe只后,把它上一层的Observable发射到另一个线程中subscribe();也就是说一个简单的subscribeOn,如下图:
图5和图6的效果是一样的!也就说,subscribeOn最简单的解释,就是把上面的代码,或者说上一层的Observable丢到一个新的线程去发射元素了。
用一个图来表达执行的流程:
相信这也就解释了为什么,每个Observable只有第一个subscribeOn会起作用了。
但是问题来了,假如说我在subscribeOn之后再执行了若干map操作呢?为何之后的map也是发射在subscribeOn指定的线程呢?
其实想想很简单,虽然我们在subscribeOn之后,只会将上一层的Observable发射在新的线程,但是最上层的Observable拿到的subscriber已经是经由最下层subscriber一路包装上来的,包含所有转换的subscriber,也就是说,我们所有的map的转换,不管是发生在subscribeOn之前还是之后,最后都会在subscribeOn指定的线程里面执行
subscribeOn是一个非常重要的线程切换操作符,希望大家可以结合上一篇文章,好好理解它的用法。同时,因为我并没有详细介绍,Scheduler是怎样生成对应的ExecutorService,以及ExecutorService是怎么样维护自己的线程池的,大家可以自行跟踪代码看看不同Scheduler的实现。在看完IO,Computation,newThread这三个线程调度器之后,大家可以在不看源代码的情况下,想想AndroidScheduler.mainThread()这个安卓的主线程调度器可以怎么实现,这样有助于大家举一反三的思考。
今天就到这,下周我会继续讲解observeOn操作符。RxJava的系列也就到了尾声啦!
各位周末愉快!热热身晚上准备dota2五连黑了