去年我的MBP坏了换的主板,时隔一年TMD主板又坏了!
我还没觉得压力大呢,结果电脑先扛不住了!说好的苹果质量好呢?
即使电脑坏了,也不能停下学习的脚步。
看过RxJava源码的同学,有没有类似的感触:类看着看着就不记得了、Observable中的内部类Observer怎么看着类名都差不多啊!
加上RxJava中使用了装饰模式,稍不留神就要回头看看刚刚传进来的是哪个类!
只有了解其内部设计思想就不会轻易陷入繁多的类旋涡中,本文主要梳理记录RxJava的工作流程,按我自己理解与掌握的需求做记录
网上也有很多讲解RxJava源码的文章,推荐两个我看过写的思路清晰的博主:
正文
简单使用
添加依赖,目前最新版本
implementation 'io.reactivex.rxjava2:rxjava:2.2.3'
implementation 'io.reactivex.rxjava2:rxandroid:2.1.0'
以下所有源码均来自此版本
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(0);
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return "number is " + integer;
}
})
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.i("ljf", "onSubscribe: ");
}
@Override
public void onNext(String s) {
Log.i("ljf", "onNext: ");
}
@Override
public void onError(Throwable e) {
Log.i("ljf", "onError: ");
}
@Override
public void onComplete() {
Log.i("ljf", "onComplete: ");
}
});
订阅关系
RxJava采用的是观察者模式:Observable是被观察者、Observer是观察者,通过subscribe方法将两者关联。
Observable并不是实际事件的产生者,而是Observable所持有的ObservableOnSubscribe对象,它既是事件实际的产生者也是通知Observer的中间人。
Hook方法后面讲
Observable将传入的ObservableOnSubscribe对象,通过装饰模式变成了具体的Observable类(ObservableCreate)
ObservableCreate类实现了Observable接口唯一的一个抽象方法subscribeActual。
Scheduler后面讲
紧接使用流程到了map方法,
由于ObservableCreate没有重写map方法,因此它的具体实现在Observable中
将传入的Function和自身,通过装饰模式生成Observable的具体实现类ObservableMap对象
注意此时source的this指代对象类型为ObservableCreate
通过两次的装饰模式,现在的Observable对象具体类型为ObservableMap
使用流程紧接到了subscribe方法,ObservableMap类没重写此方法,因此具体的实现在Observable类中
经过判空处理后调用抽象方法subscribeActual,此时Observable对象的类型为ObservableMap
参数t是我们自己实现的Observer接口对象,function为具体的map函数
source对象为之前传入的ObservableCreate类型的Observable对象
使用装饰模式将t和function生成新对象MapObserver类型的Observer
用ObservableCreate类型的Observable订阅MapObserver类型的Observer
MapObserver将传入的Observer(自己实现的接口)通过父类的构造方法保存到变量downstream
当收到通知t时
都会通过mapper的Function将类型由T转为U,并将结果v回调到自己实现的Observer中,从而完成了数据类型由T转U的变换
通知在哪里发送的呢?
这里的source对象为之前传入的ObservableCreate,同样进入subscribeActual方法
ObservableMap类型的observer被组装成CreateEmitter类型的parent,注意这里不是Observer类型
紧接着第一次回调
第一次到达用户写的onSubscribe回调方法中,表示注册成功
接着第二次回调subscribe
source是ObservableOnSubscribe类型,用户手动实现的接口
通过回调将生成的CreateEmitter类型的变量parent发射器,回调给用户,用户拿到发射器就可以发射原始数据
此时发射器持有的是ObservableMap类型的observer,发射器一发射数据,ObservableMap类型的observer就由onNext方法会收到原始数据,在内部做类型转换处理后,通知用户自己实现的Observer接口对象,从而完成整个观察者模式中的事件传递
订阅及通知的总结
线程调度
线程调度关键的两个方法subscribeOn、observeOn
subscribeOn
通过装饰模式生成ObservableSubscribeOn的Observable类
构造方法将参数初始化
了解了上面订阅关系一节内容后,能够得知
下一层的observer将传入进来
将下层的observer装饰成SubscribeOnObserver类型的parent,回调
observer.onSubscribe(parent);
出问题了!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
本层生成的Observer变量parent没有和上一层的Observable进行订阅,如果不订阅的话链式调用就断掉了
代码后面还一句
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
设置取消订阅的操作对象
对象来自
scheduler.scheduleDirect(new SubscribeTask(parent))
scheduler为用户设置的线程调度对象
由于createWorker为抽象方法,这里以Schedulers.io为具体参数IoScheduler为具体实现类进行分析Scheduler的生成后面讲
组装Task
将传入的Runnable又包裹了一层
有了worker和task开始执行
w.schedule(task, delay, unit);
又将任务给了其他工人
在预处理后终于将Runnable执行了,只是多了两步:1、将Runnable强转Callable;2、结果Future赋值用来控制Callable
这里的executor变量则是大家熟悉的线程池,它在具体Scheduler生成的时候被初始化
这时再回头看看
在run方法中执行了订阅操作,因为Runnable交给了线程池,所以订阅的实际执行操作在子线程
source.subscribe(parent)
会触发向上一层的订阅,即从此刻起后续订阅的操作都在此子线程中进行
为什么多次subscribeOn仅一次生效呢!!这里引用Rxjava 2.x 源码系列 - 线程切换 (上)中的一段伪代码来看看
subscribeOn并不是没有生效,而是被包裹了几层线程执行,但实际具体的工作在最内层的线程执行
以上只是一方面,还有一方面的原因在Rxjava 2.x 源码系列 - 线程切换 (上)没有提到,我补充一点
在SubscribeOnObserver具体执行的方法中,没做处理直接丢给下级的Observer,才会出现如上图伪代码的样式
如果有处理,则会在伪代码第4-5行出现其他任务
这里只是分析源码,考虑作者意图,其实作者的设计是合理的,事件发生的线程没必要切换,完全可以在观察的线程中切换以达到业务需求(具体可能要慢慢体会)
observeOn
observeOn方法没有被子类所重写,具体的实现在Observable类中
Scheduler被装饰成ObservableObserveOn类型的Observable,并返回出去
与其他类型的Observable一样的套路,构造函数仅存储变量,在被subscribe时才触发实际操作
subscribeActual方法中有两个分支:当前线程、指定线程
- 当前线程
如果Scheduler的类型是TrampolineScheduler,则跳过这一层的注册,直接将下一层的observer与上一层的source进行注册。
TrampolineScheduler这个Scheduler做了什么呢?
直接将Runnable给run了,当然源码逻辑并没有走到run
- 指定线程
将传入的observer装饰成了ObserveOnObserver类
@Override
public void onSubscribe(Disposable d) {//第一次被回调的方法
if (DisposableHelper.validate(this.upstream, d)) {/判断upstream是否为空,第一次走此方法,上一层一定为空
this.upstream = d;//保存上层
if (d instanceof QueueDisposable) {//上层设置了任务执行的所在线程,即多次调用observeOn
@SuppressWarnings("unchecked")
QueueDisposable<T> qd = (QueueDisposable<T>) d;
int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);//得到上层的线程调度
if (m == QueueDisposable.SYNC) {//上层为同步
sourceMode = m;
queue = qd;
done = true;//记录变量,本层完成工作记录done状态
downstream.onSubscribe(this);//回调下层
schedule();//执行任务
return;
}
if (m == QueueDisposable.ASYNC) {//上层为异步
sourceMode = m;
queue = qd;//记录变量
downstream.onSubscribe(this);//回调下层
return;
}
}
//上层没有设置所在线程,第一次调用observeOn
queue = new SpscLinkedArrayQueue<T>(bufferSize);//初始化任务队列
downstream.onSubscribe(this);//回调下层
}
}
通过上面代码的分析,ObserveOnObserver在被onSubscribe回调时,初始化了本层的变量。
在ObservableObserveOn.subscribeActual时判断了一次所在线程,在ObserveOnObserver.onSubscribe时又判断了一次所在线程
@Override
public void onNext(T t) {
if (done) {//已完成
return;
}
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);//能来到这里说明事件的处理在本层,将t加入队列
}
schedule();
}
传来的事件经过处理,紧接着被调度
void schedule() {
if (getAndIncrement() == 0) {//自增整型,只能被调用一次
worker.schedule(this);
}
}
@Override
public void run() {
if (outputFused) {//上层处理
drainFused();
} else {//本层处理
drainNormal();
}
}
worker的实际操作在run方法中,所以worker所在的线程即实际工作的线程
void drainNormal() {//本层处理
int missed = 1;
final SimpleQueue<T> q = queue;
final Observer<? super T> a = downstream;
for (;;) {
if (checkTerminated(done, q.isEmpty(), a)) {//判断是否处理完成
return;
}
for (;;) {//取队列数据
boolean d = done;
T v;
try {
v = q.poll();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
disposed = true;
upstream.dispose();
q.clear();
a.onError(ex);
worker.dispose();
return;
}
boolean empty = v == null;
if (checkTerminated(d, empty, a)) {
return;
}
if (empty) {
break;
}
a.onNext(v);//回调下一层
}
missed = addAndGet(-missed);//原子整型操作
if (missed == 0) {
break;
}
}
}
void drainFused() {//上层处理
int missed = 1;
for (;;) {
if (disposed) {
return;
}
boolean d = done;
Throwable ex = error;
if (!delayError && d && ex != null) {
disposed = true;
downstream.onError(error);
worker.dispose();
return;
}
downstream.onNext(null);//回调下层null,与本层处理null形成对应关系
if (d) {
disposed = true;
ex = error;
if (ex != null) {
downstream.onError(ex);
} else {
downstream.onComplete();
}
worker.dispose();
return;
}
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
这里的逻辑比较复杂,总结一句话:本层处理的直接操作队列,上层处理的回调下层null,本层处理与上层处理关键看回调onNext出现的位置
补充内容
Hook方法
源码中很多地方出现了RxJavaPlugins.XX方法,简单看一下代码
RxJavaPlugins中所有的变量都有volatile关键字
RxJavaPlugins中所有的方法都有static关键字
以onAssembly方法为例
Observable在RxJavaPlugins中经过处理后返回
onObservableAssembly为Function类型
以传入的Observable类型的source作为参数,经过onObservableAssembly的处理转换后返回
而onObservableAssembly又是通过get、set方法设置的
RxJavaPlugins的使用在一些关键的位置有埋点,当代码执行到指定位置时,我们只需将需要的操作设置到RxJavaPlugins中,代码便会自动的走到扩展的代码中,充分体现了RxJava2的扩展性。
Scheduler
它是一个抽象类,内部还有一个抽象类Worker
具体的实现类有以下几种
使用时通过Schedulers默认初始化的Scheduler
提供的Scheduler都是静态常量,这也是建议使用默认提供Scheduler的原因,不管你用不用反正都初始化占好内存了,为何不用呢!浪费。
每种Scheduler都有一个Worker与之对应,这里就不详拆每一个Scheduler了,都是一些基本的线程操作可以参考这里:
Android中的线程问题