条件再艰苦也要记录的RxJava2笔记

去年我的MBP坏了换的主板,时隔一年TMD主板又坏了!

图片来自网络

我还没觉得压力大呢,结果电脑先扛不住了!说好的苹果质量好呢?

图片来自网络

即使电脑坏了,也不能停下学习的脚步。

看过RxJava源码的同学,有没有类似的感触:类看着看着就不记得了、Observable中的内部类Observer怎么看着类名都差不多啊!

Observable
Observer

加上RxJava中使用了装饰模式,稍不留神就要回头看看刚刚传进来的是哪个类!

只有了解其内部设计思想就不会轻易陷入繁多的类旋涡中,本文主要梳理记录RxJava的工作流程,按我自己理解与掌握的需求做记录

网上也有很多讲解RxJava源码的文章,推荐两个我看过写的思路清晰的博主:

RxJava2 源码解析——流程
Rxjava 2.x 源码系列


正文

简单使用

添加依赖,目前最新版本

    implementation 'io.reactivex.rxjava2:rxjava:2.2.3'
    implementation 'io.reactivex.rxjava2:rxandroid:2.1.0'

以下所有源码均来自此版本

        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(0);
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
            }
        })
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .map(new Function<Integer, String>() {
                    @Override
                    public String apply(Integer integer) throws Exception {
                        return "number is " + integer;
                    }
                })
                .subscribe(new Observer<String>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.i("ljf", "onSubscribe: ");
                    }

                    @Override
                    public void onNext(String s) {
                        Log.i("ljf", "onNext: ");
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.i("ljf", "onError: ");
                    }

                    @Override
                    public void onComplete() {
                        Log.i("ljf", "onComplete: ");
                    }
                });

订阅关系

RxJava采用的是观察者模式:Observable是被观察者、Observer是观察者,通过subscribe方法将两者关联。

Observable并不是实际事件的产生者,而是Observable所持有的ObservableOnSubscribe对象,它既是事件实际的产生者也是通知Observer的中间人。

Hook方法后面讲

Observable将传入的ObservableOnSubscribe对象,通过装饰模式变成了具体的Observable类(ObservableCreate)

ObservableCreate

ObservableCreate类实现了Observable接口唯一的一个抽象方法subscribeActual。

Scheduler后面讲

紧接使用流程到了map方法,

由于ObservableCreate没有重写map方法,因此它的具体实现在Observable中

Observable

将传入的Function和自身,通过装饰模式生成Observable的具体实现类ObservableMap对象

注意此时source的this指代对象类型为ObservableCreate

ObservableMap

通过两次的装饰模式,现在的Observable对象具体类型为ObservableMap

使用流程紧接到了subscribe方法,ObservableMap类没重写此方法,因此具体的实现在Observable类中

经过判空处理后调用抽象方法subscribeActual,此时Observable对象的类型为ObservableMap

ObservableMap

参数t是我们自己实现的Observer接口对象,function为具体的map函数

source对象为之前传入的ObservableCreate类型的Observable对象

使用装饰模式将t和function生成新对象MapObserver类型的Observer

用ObservableCreate类型的Observable订阅MapObserver类型的Observer

MapObserver

MapObserver将传入的Observer(自己实现的接口)通过父类的构造方法保存到变量downstream

BasicFuseableObserver

当收到通知t时

MapObserver

都会通过mapper的Function将类型由T转为U,并将结果v回调到自己实现的Observer中,从而完成了数据类型由T转U的变换

通知在哪里发送的呢?

ObservableMap

这里的source对象为之前传入的ObservableCreate,同样进入subscribeActual方法

ObservableCreate

ObservableMap类型的observer被组装成CreateEmitter类型的parent,注意这里不是Observer类型

紧接着第一次回调

ObservableCreate

第一次到达用户写的onSubscribe回调方法中,表示注册成功

ObservableCreate

接着第二次回调subscribe

source是ObservableOnSubscribe类型,用户手动实现的接口

通过回调将生成的CreateEmitter类型的变量parent发射器,回调给用户,用户拿到发射器就可以发射原始数据

CreateEmitter

此时发射器持有的是ObservableMap类型的observer,发射器一发射数据,ObservableMap类型的observer就由onNext方法会收到原始数据,在内部做类型转换处理后,通知用户自己实现的Observer接口对象,从而完成整个观察者模式中的事件传递

订阅及通知的总结

RxJava

线程调度

线程调度关键的两个方法subscribeOn、observeOn

subscribeOn

Observable

通过装饰模式生成ObservableSubscribeOn的Observable类

ObservableSubscribeOn

构造方法将参数初始化

了解了上面订阅关系一节内容后,能够得知

下一层的observer将传入进来

将下层的observer装饰成SubscribeOnObserver类型的parent,回调

observer.onSubscribe(parent);

出问题了!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!

本层生成的Observer变量parent没有和上一层的Observable进行订阅,如果不订阅的话链式调用就断掉了

代码后面还一句

parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));

设置取消订阅的操作对象

对象来自

scheduler.scheduleDirect(new SubscribeTask(parent))

scheduler为用户设置的线程调度对象

Scheduler

由于createWorker为抽象方法,这里以Schedulers.io为具体参数IoScheduler为具体实现类进行分析Scheduler的生成后面讲

IoScheduler

组装Task

将传入的Runnable又包裹了一层

有了worker和task开始执行

w.schedule(task, delay, unit);

EventLoopWorker

又将任务给了其他工人

NewThreadWorker

在预处理后终于将Runnable执行了,只是多了两步:1、将Runnable强转Callable;2、结果Future赋值用来控制Callable

这里的executor变量则是大家熟悉的线程池,它在具体Scheduler生成的时候被初始化

这时再回头看看

ObservableSubscribeOn

在run方法中执行了订阅操作,因为Runnable交给了线程池,所以订阅的实际执行操作在子线程

source.subscribe(parent)会触发向上一层的订阅,即从此刻起后续订阅的操作都在此子线程中进行

为什么多次subscribeOn仅一次生效呢!!这里引用Rxjava 2.x 源码系列 - 线程切换 (上)中的一段伪代码来看看

Rxjava 2.x 源码系列 - 线程切换 (上)

subscribeOn并不是没有生效,而是被包裹了几层线程执行,但实际具体的工作在最内层的线程执行

以上只是一方面,还有一方面的原因在Rxjava 2.x 源码系列 - 线程切换 (上)没有提到,我补充一点

SubscribeOnObserver

在SubscribeOnObserver具体执行的方法中,没做处理直接丢给下级的Observer,才会出现如上图伪代码的样式

如果有处理,则会在伪代码第4-5行出现其他任务

这里只是分析源码,考虑作者意图,其实作者的设计是合理的,事件发生的线程没必要切换,完全可以在观察的线程中切换以达到业务需求(具体可能要慢慢体会)

observeOn

Observable

observeOn方法没有被子类所重写,具体的实现在Observable类中

Scheduler被装饰成ObservableObserveOn类型的Observable,并返回出去

ObservableObserveOn

与其他类型的Observable一样的套路,构造函数仅存储变量,在被subscribe时才触发实际操作

ObservableObserveOn

subscribeActual方法中有两个分支:当前线程、指定线程

  • 当前线程
    如果Scheduler的类型是TrampolineScheduler,则跳过这一层的注册,直接将下一层的observer与上一层的source进行注册。

TrampolineScheduler这个Scheduler做了什么呢?

TrampolineScheduler

直接将Runnable给run了,当然源码逻辑并没有走到run

  • 指定线程

将传入的observer装饰成了ObserveOnObserver类

        @Override
        public void onSubscribe(Disposable d) {//第一次被回调的方法
            if (DisposableHelper.validate(this.upstream, d)) {/判断upstream是否为空,第一次走此方法,上一层一定为空
                this.upstream = d;//保存上层
                if (d instanceof QueueDisposable) {//上层设置了任务执行的所在线程,即多次调用observeOn
                    @SuppressWarnings("unchecked")
                    QueueDisposable<T> qd = (QueueDisposable<T>) d;

                    int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);//得到上层的线程调度

                    if (m == QueueDisposable.SYNC) {//上层为同步
                        sourceMode = m;
                        queue = qd;
                        done = true;//记录变量,本层完成工作记录done状态
                        downstream.onSubscribe(this);//回调下层
                        schedule();//执行任务
                        return;
                    }
                    if (m == QueueDisposable.ASYNC) {//上层为异步
                        sourceMode = m;
                        queue = qd;//记录变量
                        downstream.onSubscribe(this);//回调下层
                        return;
                    }
                }
                //上层没有设置所在线程,第一次调用observeOn
                queue = new SpscLinkedArrayQueue<T>(bufferSize);//初始化任务队列

                downstream.onSubscribe(this);//回调下层
            }
        }

通过上面代码的分析,ObserveOnObserver在被onSubscribe回调时,初始化了本层的变量。

在ObservableObserveOn.subscribeActual时判断了一次所在线程,在ObserveOnObserver.onSubscribe时又判断了一次所在线程

        @Override
        public void onNext(T t) {
            if (done) {//已完成
                return;
            }

            if (sourceMode != QueueDisposable.ASYNC) {
                queue.offer(t);//能来到这里说明事件的处理在本层,将t加入队列
            }
            schedule();
        }

传来的事件经过处理,紧接着被调度

        void schedule() {
            if (getAndIncrement() == 0) {//自增整型,只能被调用一次
                worker.schedule(this);
            }
        }
        @Override
        public void run() {
            if (outputFused) {//上层处理
                drainFused();
            } else {//本层处理
                drainNormal();
            }
        }

worker的实际操作在run方法中,所以worker所在的线程即实际工作的线程

        void drainNormal() {//本层处理
            int missed = 1;

            final SimpleQueue<T> q = queue;
            final Observer<? super T> a = downstream;

            for (;;) {
                if (checkTerminated(done, q.isEmpty(), a)) {//判断是否处理完成
                    return;
                }

                for (;;) {//取队列数据
                    boolean d = done;
                    T v;

                    try {
                        v = q.poll();
                    } catch (Throwable ex) {
                        Exceptions.throwIfFatal(ex);
                        disposed = true;
                        upstream.dispose();
                        q.clear();
                        a.onError(ex);
                        worker.dispose();
                        return;
                    }
                    boolean empty = v == null;

                    if (checkTerminated(d, empty, a)) {
                        return;
                    }

                    if (empty) {
                        break;
                    }

                    a.onNext(v);//回调下一层
                }

                missed = addAndGet(-missed);//原子整型操作
                if (missed == 0) {
                    break;
                }
            }
        }
        void drainFused() {//上层处理
            int missed = 1;

            for (;;) {
                if (disposed) {
                    return;
                }

                boolean d = done;
                Throwable ex = error;

                if (!delayError && d && ex != null) {
                    disposed = true;
                    downstream.onError(error);
                    worker.dispose();
                    return;
                }

                downstream.onNext(null);//回调下层null,与本层处理null形成对应关系

                if (d) {
                    disposed = true;
                    ex = error;
                    if (ex != null) {
                        downstream.onError(ex);
                    } else {
                        downstream.onComplete();
                    }
                    worker.dispose();
                    return;
                }

                missed = addAndGet(-missed);
                if (missed == 0) {
                    break;
                }
            }
        }

这里的逻辑比较复杂,总结一句话:本层处理的直接操作队列,上层处理的回调下层null,本层处理与上层处理关键看回调onNext出现的位置

补充内容

Hook方法

源码中很多地方出现了RxJavaPlugins.XX方法,简单看一下代码

RxJavaPlugins

RxJavaPlugins中所有的变量都有volatile关键字

RxJavaPlugins

RxJavaPlugins中所有的方法都有static关键字

以onAssembly方法为例

Observable

Observable在RxJavaPlugins中经过处理后返回

RxJavaPlugins

onObservableAssembly为Function类型

RxJavaPlugins

以传入的Observable类型的source作为参数,经过onObservableAssembly的处理转换后返回

而onObservableAssembly又是通过get、set方法设置的

RxJavaPlugins的使用在一些关键的位置有埋点,当代码执行到指定位置时,我们只需将需要的操作设置到RxJavaPlugins中,代码便会自动的走到扩展的代码中,充分体现了RxJava2的扩展性。

Scheduler

它是一个抽象类,内部还有一个抽象类Worker

Scheduler

具体的实现类有以下几种

Scheduler

使用时通过Schedulers默认初始化的Scheduler

提供的Scheduler都是静态常量,这也是建议使用默认提供Scheduler的原因,不管你用不用反正都初始化占好内存了,为何不用呢!浪费。

每种Scheduler都有一个Worker与之对应,这里就不详拆每一个Scheduler了,都是一些基本的线程操作可以参考这里:
Android中的线程问题

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容