Rxjava线程切换原理学习

Rxjava可以非常方便的完成线程的切换,链式调用这种艺术般的设计深受开发者的喜爱。本节通过源码来深入了解一下这其中的原理。
网上有很多介绍的文章,但大部分都有些晦涩难懂。本文旨在由浅入深,一步步深入“虎穴”。

最基本调用

先看一下最简单的调用方式:

        Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                System.out.println("subscribe---" + Thread.currentThread().getName());
                System.out.println("发送数据:hello \n");
                e.onNext("hello");
                e.onComplete();
            }
        })
                .subscribe(new Observer<String>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        System.out.println("onSubscribe---" + Thread.currentThread().getName() + '\n');
                    }

                    @Override
                    public void onNext(String s) {
                        System.out.println("onNext---" + Thread.currentThread().getName() + '\n');
                        System.out.println("接收数据:" + s);
                    }

                    @Override
                    public void onComplete() {
                        System.out.println("onComplete---" + Thread.currentThread().getName());
                    }

                    @Override
                    public void onError(Throwable e) {
                    }
                });

可以说没有添加任何逻辑,只有发送与接收。这里把每一步的所在线程,发送和接收的数据,打印了出来。

运行结果:

onSubscribe---main

subscribe---main
发送数据:hello

onNext---main
接收数据:hello

onComplete---main

注意一下subscribe方法,代码里出现了两个。
第一个是我们发送数据用的,属于ObservableOnSubscribe接口;
第二个是提交整个链式调用的,属于ObservableSource接口。
不要搞混了,打印出来的是第一个。
然后注意一下打印出来的顺序,onSubscribe是先于subscribe,稍后我们从源码中查看原因。

整个代码片段用到了两个Rxjava的方法,create创建最初的数据源;ObservableSourcesubscribe创建数据接收者。

下面我们先看看create的实现:

    public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        ObjectHelper.requireNonNull(source, "source is null");
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }

总共做了3件事:

  1. 检查source是否为空,合法性检查很常见,后面不再关注;
  2. 创建一个ObservableCreate对象,将source传给它;
  3. 调用RxJavaPlugins.onAssembly方法,以ObservableCreate为参,并最终返回该方法的返回值。

我们先看看RxJavaPlugins.onAssembly是干嘛的,代码不搞透彻总觉得不够处女座:

    public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
        Function<? super Observable, ? extends Observable> f = onObservableAssembly;
        if (f != null) {
            return apply(f, source);
        }
        return source;
    }

从类名称也可以看出来,RxJavaPlugins是一个hook类。说人话就是,我们可以设置一些自己的操作。
比如上述方法中的onObservableAssembly我们可以设置一下,打印Observable的类名:

RxJavaPlugins.setOnObservableAssembly(new Function<Observable, Observable>() {
            @Override
            public Observable apply(Observable observable) throws Exception {
                System.out.println(observable);
                return observable;
            }
        });

看看结果:

io.reactivex.internal.operators.observable.ObservableCreate@e2144e4

onSubscribe---main

subscribe---main
发送数据:hello 

onNext---main
接收数据:hello

onComplete---main

可以看到打印出来的就是第2步提到的ObservableCreate类。下面就来看一下这个类的构造方法:

public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }

ObservableCreate会把传进来的source先保存下来。这里的source就是我们创建的用来发送数据的ObservableOnSubscribe。如下:

        Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                System.out.println("subscribe---" + Thread.currentThread().getName());
                System.out.println("发送数据:hello \n");
                e.onNext("hello");
                e.onComplete();
            }
        })

到此,create的流程走完了,我们看一下subscribe,注意是链式调用里面的:

    public final void subscribe(Observer<? super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            observer = RxJavaPlugins.onSubscribe(this, observer);

            ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");

            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // can't call onError because no way to know if a Disposable has been set or not
            // can't call onSubscribe because the call might have set a Subscription already
            RxJavaPlugins.onError(e);

            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        }
    }

看似很长的代码,其实真正相关的就是调用了subscribeActual方法:

    protected abstract void subscribeActual(Observer<? super T> observer);

这是一个抽象方法,具体的实现,我们去子类中看。这里的子类就是我们的ObservableCreate

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }
  1. observer包装成CreateEmitter对象;
  2. 调用observer.onSubscribeCreateEmitter作为参数;
  3. 调用source.subscribe, CreateEmitter作为参数。

再次说明一下,这里的observersource是我们自己写的!
这里也能看到,onSubscribe是最先执行的方法
我们调用e.onNext("hello");发送数据,这个e就是CreateEmitter对象。

然后看一下CreateEmitter:

    static final class CreateEmitter<T>
    extends AtomicReference<Disposable>
    implements ObservableEmitter<T>, Disposable {
        final Observer<? super T> observer;

        CreateEmitter(Observer<? super T> observer) {
            this.observer = observer;
        }

        @Override
        public void onNext(T t) {
            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }

        @Override
        public void onComplete() {
            if (!isDisposed()) {
                try {
                    observer.onComplete();
                } finally {
                    dispose();
                }
            }
        }

这里省略部分代码。该类是对observer的一个封装。
需要注意的是,这里的onNext onComplete继承自Emitter接口,而不是Observer接口。
Rxjava好多接口中存在同名方法,可能产生混乱,要记得区分。

流向图 箭头颜色代表不同线程

添加subscribeOn

所有代码如下:

        RxJavaPlugins.setOnObservableAssembly(new Function<Observable, Observable>() {
            @Override
            public Observable apply(Observable observable) throws Exception {
                System.out.println(observable);
                return observable;
            }
        });

        Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                System.out.println("subscribe---" + Thread.currentThread().getName());
                System.out.println("发送数据:hello \n");
                e.onNext("hello");
                e.onComplete();
            }
        })
                .subscribeOn(Schedulers.single())
                .subscribe(new Observer<String>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        System.out.println();
                        System.out.println("onSubscribe---" + Thread.currentThread().getName() + '\n');
                    }

                    @Override
                    public void onNext(String s) {
                        System.out.println("onNext---" + Thread.currentThread().getName() + '\n');
                        System.out.println("接收数据:" + s);
                    }

                    @Override
                    public void onComplete() {
                        System.out.println("onComplete---" + Thread.currentThread().getName());
                    }

                    @Override
                    public void onError(Throwable e) {
                    }
                });

添加了subscribeOn(Schedulers.single()),看下打印结果:

io.reactivex.internal.operators.observable.ObservableCreate@e2144e4
io.reactivex.internal.operators.observable.ObservableSubscribeOn@1ee0005

onSubscribe---main

subscribe---RxSingleScheduler-1
发送数据:hello 

onNext---RxSingleScheduler-1
接收数据:hello

onComplete---RxSingleScheduler-1

有如下变化:

  1. 多了一个ObservableSubscribeOn对象,该对象的创建过程与ObservableCreate类似
  2. onSubscribe依旧在主线程调用,其他的都是在子线程

我们依次创建了ObservableCreateObservableSubscribeOn两个对象,那最终调用的就是ObservableSubscribeOnsubscribe方法。

先看一下ObservableSubscribeOn的构造器:

public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;

    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source);
        this.scheduler = scheduler;
    }

除了source,还多了一个我们传入的Scheduler。
注意这里的source已经是ObservableCreate对象了。因为我们是在ObservableCreate对象上执行的subscribeOn方法。

我们最终调用的是ObservableSubscribeOnsubscribe方法,会执行到:

    @Override
    public void subscribeActual(final Observer<? super T> observer) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);

        s.onSubscribe(parent);

        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }

可以看到,还是会先在subscribe方法调用的线程调用onSubscribe,然后在scheduler设置的线程运行SubscribeTask

    final class SubscribeTask implements Runnable {
        private final SubscribeOnObserver<T> parent;

        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }

        @Override
        public void run() {
            source.subscribe(parent);
        }
    }

任务很简单,在scheculer设置的线程执行source的subscribe方法。这样就完成了线程的切换。
这里还涉及到一个很重要的SubscribeOnObserver类:

    static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
        final Observer<? super T> actual;

        final AtomicReference<Disposable> s;

        SubscribeOnObserver(Observer<? super T> actual) {
            this.actual = actual;
            this.s = new AtomicReference<Disposable>();
        }

        @Override
        public void onSubscribe(Disposable s) {
            DisposableHelper.setOnce(this.s, s);
        }

        @Override
        public void onNext(T t) {
            actual.onNext(t);
        }

        @Override
        public void onComplete() {
            actual.onComplete();
        }

       void setDisposable(Disposable d) {
            DisposableHelper.setOnce(this, d);
        }
}

该类的作用:

1、保证只有一次onSubscribe回调,如下:

        @Override
        public void onSubscribe(Disposable s) {
            DisposableHelper.setOnce(this.s, s);
        }

ObservableCreate调用的是这个类中的onSubscribe,该回调并不会继续向下传递,保证只回调一次。
2、及时dispose
3、其他

数据流向图:

流向图 箭头颜色代表不同线程

添加observeOn

subscribeOn后面添加一句observeOn(Schedulers.computation()),看下输出:

io.reactivex.internal.operators.observable.ObservableCreate@e2144e4
io.reactivex.internal.operators.observable.ObservableSubscribeOn@1ee0005
io.reactivex.internal.operators.observable.ObservableObserveOn@25618e91

onSubscribe---main

subscribe---RxSingleScheduler-1
发送数据:hello 

onNext---RxComputationThreadPool-1
接收数据:hello

onComplete---RxComputationThreadPool-1

可以看到,又多了一个ObservableObserveOn类。

看一下构造器:

public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;
    final boolean delayError;
    final int bufferSize;
    public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
        super(source);
        this.scheduler = scheduler;
        this.delayError = delayError;
        this.bufferSize = bufferSize;
    }

同样传入sourcescheduler

  • delayError——true,则接收到onError事件后,立即停止onNext分发,并向下传递onError;false,则等待队列中的onNext分发完毕后,再分发onError事件。
  • bufferSize——缓冲区大小,默认是128

我们最终调用的是ObservableObserveOnsubscribe方法,会执行到:

    protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            Scheduler.Worker w = scheduler.createWorker();

            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }

若传入的是Schedulers.trampoline(),则不作任何处理,直接把下游的observer传递给上游的source;否则使用ObserveOnObserver包装下游的observer,并将ObserveOnObserver传递给上游的source

先看下ObserveOnObserveronSubscribe方法:

        public void onSubscribe(Disposable s) {
            if (DisposableHelper.validate(this.s, s)) {
                this.s = s;
               ...
                queue = new SpscLinkedArrayQueue<T>(bufferSize);
                //向下传递
                actual.onSubscribe(this);
            }
        }

这里省略了一些代码,先不去管它。可以看到会创建一个SpscLinkedArrayQueue对象,初始大小为我们设置的bufferSize。实际上,它的结构是这样的:

SpscLinkedArrayQueue结构

可以看到是一个链式的数组,可以无限扩容,每次扩容都会new一个新的数组,旧数组的最后一个元素指向新数组。并且数组的容量是bufferSize+1。SpscLinkedArrayQueue在设置数组容量时,也会做限制,如下:

int p2capacity = Pow2.roundToPowerOfTwo(Math.max(8, bufferSize));

最小是8,且一定是2的n次方。这是为了方便查找Index,即将求余数简化为n&(bufferSize-1),提高效率。

看一下onNext

        @Override
        public void onNext(T t) {
            if (done) {
                //一旦收到onError或onComplete,立即停止接收新的onNext事件
                return;
            }

            if (sourceMode != QueueDisposable.ASYNC) {
                queue.offer(t);
            }
            schedule();
        }

先不用考虑sourceMode,正常会将事件放入队列中,然后执行schedule

        void schedule() {
            if (getAndIncrement() == 0) {//每次调用使计数+1。只有为0时,才会执行worker.schedule,防止重复调用
                worker.schedule(this);
            }
        }

在worker线程执行run方法

        @Override
        public void run() {
            if (outputFused) {
                drainFused();
            } else {
                drainNormal();
            }
        }

正常执行到drainNormal:

        void drainNormal() {
            int missed = 1;

            final SimpleQueue<T> q = queue;
            final Observer<? super T> a = actual;

            for (;;) {//队列为空时,最多自旋两次尝试取数据(missed控制)
                if (checkTerminated(done, q.isEmpty(), a)) {
                    return;
                }

                for (;;) {//循环取数据
                    boolean d = done;
                    T v;

                    try {
                        //从队列取数据
                        v = q.poll();
                    } catch (Throwable ex) {
                        Exceptions.throwIfFatal(ex);
                        s.dispose();
                        q.clear();
                        a.onError(ex);
                        worker.dispose();
                        return;
                    }
                    boolean empty = v == null;

                    if (checkTerminated(d, empty, a)) {
                        return;
                    }

                    if (empty) {//数据去完,跳出该层循环
                        break;
                    }
                    //向下游分发数据
                    a.onNext(v);
                }

                missed = addAndGet(-missed);
                if (missed == 0) {
                    break;
                }
            }
        }

这样就完成了线程切换。
missed作用:若当前计数为10,则第一次调用,missed和当前计数变为9;第二次调用计数归0,完成两次自旋后跳出循环,防止cpu空转。这时因为置0了,所以若有新的onNext事件,schedule方法又可以重新调用worker.schedule,开启循环。
作者这种优化思路,非常值得借鉴。

流程图 箭头颜色代表不同线程

至此,针对上面的例子,有如下结论:

  1. 每次链式调用都会创建一个新的Observable对象
  2. onSubscribe在当前线程调用
  3. subscribeOn多次调用时,数据在第一次调用的线程中发送
  4. observeOn多次调用时,数据在最后一次调用的线程中向下分发

上游是下游的source,下游是上游的observer,可以理解为一个双向链表结构。
subscribeOn控制上游所在的线程,observeOn控制下游所在的线程,这样就好理解多了。


多次调用observer

先看下代码吧。ObserveOnObserver#onSubscribe被我们省略的部分代码:

        @Override
        public void onSubscribe(Disposable s) {
            if (DisposableHelper.validate(this.s, s)) {
                this.s = s;
                if (s instanceof QueueDisposable) {
                    @SuppressWarnings("unchecked")
                    //qd就是上一层的ObserveOnObserver对象
                    QueueDisposable<T> qd = (QueueDisposable<T>) s;

                    int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);

                    ...
                    if (m == QueueDisposable.ASYNC) {
                        //ASYNC
                        sourceMode = m;
                        //可以理解为就是上游的队列
                        queue = qd;
                        actual.onSubscribe(this);
                        return;
                    }
                }

                queue = new SpscLinkedArrayQueue<T>(bufferSize);
                actual.onSubscribe(this);
            }
        }

        @Override
        public int requestFusion(int mode) {
            if ((mode & ASYNC) != 0) {
                //设置上游的标志位
                outputFused = true;
                return ASYNC;
            }
            return NONE;
        }

下游的observeOn会拿到上游的队列。
然后在onNext中:

        @Override
        public void onNext(T t) {
            if (done) {
                return;
            }

            if (sourceMode != QueueDisposable.ASYNC) {
                //只有第一次的observeOn会执行入队逻辑
                queue.offer(t);
            }
            schedule();
        }

run方法:

        @Override
        public void run() {
            if (outputFused) {
                //非最后一个observeOn调用
                drainFused();
            } else {
               //最后一个observeOn调用
                drainNormal();
            }
        }

drainNormal已经介绍过了,就是从队列中取数据,向下分发。看下drainFused:

        void drainFused() {
            int missed = 1;

            for (;;) {
                if (cancelled) {
                    return;
                }

                boolean d = done;
                Throwable ex = error;

                if (!delayError && d && ex != null) {
                    actual.onError(error);
                    worker.dispose();
                    return;
                }

                //向下分发null
                actual.onNext(null);

                if (d) {
                    ex = error;
                    if (ex != null) {
                        actual.onError(ex);
                    } else {
                        actual.onComplete();
                    }
                    worker.dispose();
                    return;
                }

                missed = addAndGet(-missed);
                if (missed == 0) {
                    break;
                }
            }
        }

可以看到只是向下分发了null。这很好理解,因为最下层的observeOn可以直接从最上层的observeOn的队列中取得本次数据。

通过学习源码可以发现,作者在可靠性和性能上做了很多的工作,这都是值得我们学习的地方。


说在最后

Rxjava无疑是一个非常优秀的链式调用框架,能极大减少开发量。不过也有一些弊端不可忽视:

  1. jar包有2MB
  2. 调用过程产生大量的临时对象,使用不当会产生严重的内存问题

总而言之,因地制宜,量体裁衣。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,362评论 5 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,330评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,247评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,560评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,580评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,569评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,929评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,587评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,840评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,596评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,678评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,366评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,945评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,929评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,165评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,271评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,403评论 2 342

推荐阅读更多精彩内容