隔了好久,终于有时间更新一下RxJava进阶的最后一篇文章了。前几个星期有幸参加了今年的Google I/O(谷歌举办的一年一度的开发者大会,今年在谷歌总部举行),去了一趟迷人的加州,于是文章被拖延了。有兴趣的朋友可以看看我上一篇类似于日志的google之行文章。
上一次我们介绍了subscribeOn操作符的源码,那么这一次回到正题,把observeOn操作符的用法和源码过一遍。
我们先看看这个例子:
转换很简单,我们先发射1,2,3这三个数字,在IO线程进行第一次转换,在computation线程进行第二次转换,最后在newThread线程进行打印。我们来观察一下打印结果。大家可以根据自己对observeOn操作符的理解先预测一下。(注意我在发射元素的地方进行了sleep操作,暂停当前线程一秒钟,原因之后会解释)
按照我们之前对map操作符的理解,这组操作应该会是把对数字1先后进行转换1,转换2,打印,然后在对2做同样的操作。
那么我们来看看结果是否是这样?
哈哈,终于,我们这次预测对了。的确是按照我们事前所想的那样把三个数字分别打印出来的。observeOn操作符的作用也正是如此,每次使用observeOn,都会把这个observeOn下面的操作包装在该observeOn指定的线程池中运行。
还记得上一篇讲过的subscribeOn操作符嘛,subscribeOn只能作用一次,而observeOn可以作用多次。原因我们接下来也会讲。
那么我们跟踪源码,由于我们之前几篇已经讲过RxJava的lift方法和大概的工作原理,这篇我就不多说lift的具体作用,假设大家已经了解相关知识了,如果不了解的请从map的源码分析开始。
observeOn操作符会用OperatorObserveOn改变原有的Subscriber(大家应该还记得原有的subscriber是从哪来的吧?没错这里说的原有的subscriber就是从下一级传上来的subscriber)。
在这个操作符的call方法里面(大家还记得lift方法会用操作符的call方法改变原有subscriber对吧?),生成新的Susbcriber,ObserveOnSubscriber
新的Subscriber里面,结构有点点复杂,有一个队列,然后把原有的subscriber的引用传进来,作为自己的成员对象。
新的subscriber里面,我们重点关注onNext()方法做了些啥,因为每次发射元素,都是先调用新的subscriber的onNext()方法的。
新的onNext方法会先把我们发射的原始元素添加到队列里面(line129),如果发生错误,执行onError,并且返回(line 130 - 131)。假如添加元素成功,执行schedule()。
在创建了一个新的Action对象之后,我们把对象放入一个Scheduler里面执行了。如果大家还记得上一篇文章的内容,应该了解这个Action和Scheduler的关系。类似于Java里面Runnable对象和ExecutorService一样,最后Action里面的操作会被放入线程池,寻找合适的线程执行。
所以其实最后兜兜转转,最重要的,需要执行的方法就是这个pollQueue(),这个方法从名字就可以看得出来是要获取队列元素了。而事实也的确如此,我们看看pollQueue方法里面最重要的部分。
pollQueue方法使用了一个死循环(line 182),不断的去查看queue里面的元素。我们这里重点关注line 200 -202, 在把队列里面的第一个元素取出之后,我们交给child,也就是包装进来的,来自下一级的subscriber,让它来执行它的onNext()。这样,我们的原始的Observable的第一个元素就这样处理完毕了。
这整一个pollQueue方法,因为被包装在Action对象里面,而Action对象又会被放进Scheduler,也就是RxJava包装过之后的ExecutorService里面执行,所以它是会运行在我们指定的那个线程里面的。我们可以在debug模式下面验证一下。
所以总结一下的话,使用observeOn,会在原有的Observable发射元素的时候,将元素依次添加到一个队列中,并异步的(使用一个新的线程去执行)不停的去获取队列的第一个元素,使用下一级的subscriber处理(onNext())该元素。
那么我们再将思维拓展一下,如果在observeOn后面跟的不是subscribe()方法,而是一个map方法。会有什么不同呢?
如果是map的话,大家应该还记得,下一级传上来的subscriber也不会是我们在subscribe()方法里面传进来的subscriber,而是经过OperatorMap改造过的subscriber了。
也就是说,pollQueue方法的line202,child成员就是OperatorMap改造后的subscriber,那么执行的onNext()也就是上图line52-58的这个onNext方法了,line 54中,我们会在observeOn操作符里面的定义的线程中,执行transformer.call(t),也就是说,我们保证了map操作符里面的call()方法(就是我们进行类型转换的方法)是执行在observeOn所定义的线程中,然后在执行o.onNext(),这个o对象,又是下一级传上来的subscriber。
所以,假如我们遇到了多次的ObserveOn+Map的转换的话,发射元素的流程就变成这样了:
回到我们开头的问题,为什么我们要在原始的Observable发射元素之间,暂停当前线程一秒?
理由很简单,因为如果不这样做的话,我们的map转换会被放到ExectuorService里面执行,是异步执行的,所以很有可能会出现;第一层转换都执行完毕,才执行下一层转换,最后日志的打印结果就不是我们预测的那样了。
大家想象一下,把上图的第二和第三层向右方无限拉长,拉倒第三map之后,就会出现RxJava似乎是在处理完一层之后再处理下一层的错觉。
那么到此为止,RxJava进阶的所有专题都结束了,其实关于RxJava,还有很多东西我们都没接触,例如怎么handle Subscription等等。这也就留给有兴趣的同学自己慢慢摸索了。接下来我会专心准备Android Tv的教程给大家。希望有兴趣开发安卓TV应用的同学们留意一下 ;)
另外,很高兴勇士赢球,作为一个库里球迷,我忍不住上个图: