RxJava 观察绑定和事件发送流程及其中的线程切换分析

本文的所有分析都是基于 RxJava2 进行的。以下的 RxJava 指 RxJava2
阅读本文你将会知道:

  • RxJava 的观察绑定和事件发送过程
  • RxJava 观察绑定和事件发送过程中的线程切换

从 RxJava1.0 到 RxJava2.0,在项目开发中已经使用了很长时间这个库了。链式调用,丝滑的线程切换很香,但是如果没弄清楚其中的奥妙很容易掉进线程调度的坑里。这篇文章我们就来对 RxJava 的订阅过程、时间发送过程、线程调度进行分析

订阅和事件流

先说结论

  • 按着代码书写顺序,事件自上向下发送
  • 订阅从 subscribe() 开始自下向上订阅,这也是整个事件流的起点,当订阅开始整个操作才会生效执行
  • 订阅完成后才会发送事件

图解

为了更便于理解订阅的流转方向,我将Observable调用 subscribe() 订阅描述为了 Observer beSubscribed()

订阅及数据发送

源码分析

Observabe 创建过程

此过程对应图中黑色箭头部分,以操作符中的map()操作为例:

   @CheckReturnValue
   @SchedulerSupport(SchedulerSupport.NONE)
   public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
       ObjectHelper.requireNonNull(mapper, "mapper is null");
       return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
   }

调用map操作符时,RxJavaPliguns 会注册一个新的 ObservableMap 对象,查看其它操作符会发现都有对应的 Observable 对象产生。同时,上游的 Observabe会作为 source 参数传入赋值给这个新的 Observablesource属性。层层向下,可以对这个新生成的 Observable又可以继续使用操作符。

订阅过程:

当调用最后一个 Observablesubscribe() 方法时,即开始订阅过程。此过程对应图中红色箭头部分

   @SchedulerSupport(SchedulerSupport.NONE)
   @Override
   public final void subscribe(Observer<? super T> observer) {
       ObjectHelper.requireNonNull(observer, "observer is null");
       try {
           observer = RxJavaPlugins.onSubscribe(this, observer);

           ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");

           subscribeActual(observer);
       } catch (NullPointerException e) { // NOPMD
           throw e;
       } catch (Throwable e) {
           Exceptions.throwIfFatal(e);
           // can't call onError because no way to know if a Disposable has been set or not
           // can't call onSubscribe because the call might have set a Subscription already
           RxJavaPlugins.onError(e);

           NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
           npe.initCause(e);
           throw npe;
       }
   }

在调用subscribe(Observer) 时实际上会去调用各个 Observable实现子类中的 subscribeActual() 方法:

   @Override
   public void subscribeActual(Observer<? super U> t) {
       source.subscribe(new MapObserver<T, U>(t, function));
   }

而在这个subscribeActual() 方法也很简单,调用了 source 去订阅一个新生成的 Observer 对象,同时这个新的MapObserver会将调用subscribe()时传入的 observer,赋值给downstream属性。这样每一级订阅都会将上级的 Observable、本级生成的 Observer、订阅下级传入的Observer联系起来,直到达到 Observable 最初创建的地方整个订阅过程结束。

事件发送过程:

此过程对应图中绿色箭头部分Observable 事件起点创建有很多中操作符,他们都会创建出最初发送的事件/数据,以 ObservableCreate为例:

   @Override
   protected void subscribeActual(Observer<? super T> observer) {
       CreateEmitter<T> parent = new CreateEmitter<T>(observer);
       observer.onSubscribe(parent);

       try {
           source.subscribe(parent);
       } catch (Throwable ex) {
           Exceptions.throwIfFatal(ex);
           parent.onError(ex);
       }
   }

订阅时会调用source.subscrebe(parent),而这个source 又是从哪儿来的呢?

   public ObservableCreate(ObservableOnSubscribe<T> source) {
       this.source = source;
   }
   Observable.create(object : ObservableOnSubscribe<String> {
          override fun subscribe(emitter: ObservableEmitter<String>) {
               emitter.onNext("data")
          }

   })

从代码中我们可以看出,这个 source 即为我们创建时传入的 ObservableOnSubscribe,因此emitter.onNext("data")即是事件发送的起点。我们再继续看emitteronNext() 做了什么:

        @Override
        public void onNext(T t) {
            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }

源码中现实调用了observer.onNext(),而这个observer 则是前面订阅过程中 source.subscribe(new MapObserver<T, U>(t, function)) 传入的那个 observer,从而将事件发送到了下一级,下一级的 Observer 同样在 onNext() 将事件发送到更下一级,一直到最终我们 subscribe()时传入的那个Observer 实例完毕。

线程调度

事件订阅发送流程通过上面的文章基本已经能够摸清了,我们接下来关注另一个重点 线程调度问题。

调度方式

RxJava 中线程变换通过 subscribeOn()observeOn()两个操作来进行。其中 subscribeOn()改变的是订阅线程的执行线程,即事件发生的线程。observeOn()改变的是事件结果观察者回调所在线程,即 onNext()方法所在的线程。

举个栗子

使用 RxJava + Retrofit 进行网络请求时,用 RxJava 管理网络请求过程的线程切换。subscribeOn()指定的是网络请求的线程,observeOn()指定的是网络请求后事件流的执行线程。

源码分析

前面说过,每次操作符的使用,RxJava 都会生成一个对应的新的 Observable对象。observeOn()subscribeOn()也不例外。线程调度的核心逻辑都在 ObservableSubscribeOnObservableObserveOn两个类中

subscribeOn()过程

  @CheckReturnValue
  @SchedulerSupport(SchedulerSupport.CUSTOM)
  public final Observable<T> subscribeOn(Scheduler scheduler) {
      ObjectHelper.requireNonNull(scheduler, "scheduler is null");
       return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
   }

调用 subscribeOn() 时会产生一个新的ObservableSubscribeOn并把当前这个Observable 和传入的 Scheduler作为参数传入。前面分析过当最终调用 subscribe()时会引起整个观察链的 Observable 自下而上调用 subscribe(),而这个subscribe()方法中实际为调用抽象类 Observable的各个实现子类的 subscribeActual()方法 。

   @Override
   public void subscribeActual(final Observer<? super T> observer) {
       final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);

       observer.onSubscribe(parent);

       parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
   }

主要看这句 scheduler.scheduleDirect(new SubscribeTask(parent));,SubscribeTask 前面内容已经分析过,就是调用上级 Observable 来订阅生成的这个 SubscribeOnObserver

   @NonNull
   public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
       final Worker w = createWorker();

       final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

       DisposeTask task = new DisposeTask(decoratedRun, w);

       w.schedule(task, delay, unit);

       return task;
   }

scheduleDirect 方法,会使用传入的 scheduler 在指定的线程创建一个 Worker 对象来执行SubscribeTask,从而达到了切换订阅线程的目的。所以多个subscribeOn()叠加时,最终线程还是会回到最后执行的(代码第一次出现的)subscribeOn() 指定的线程。

observeOn()过程

调用 observeOn(Scheduler) 方法,会调用内部的同名方法生成一个新的 ObservableObserveOn对象,并把当前这个Observable 和传入的 Scheduler作为参数传入。订阅过程与ObservableSubscribeOn不一样,会直接在当前线程调用上级Observable订阅自己,,我们主要看ObservableObserveOnObserveOnObserver是如何调度结果数据发送的线程的。

       @Override
       public void onNext(T t) {
           if (done) {
               return;
           }

           if (sourceMode != QueueDisposable.ASYNC) {
               queue.offer(t);
           }
           schedule();
       }

       void schedule() {
           if (getAndIncrement() == 0) {
               worker.schedule(this);
           }
       }

从源码中可以发现,最终会使用 worker 去向下游发送事件。这个 worker就是我们observeOn() 方法中指定的线程创建的 worker。从而达到切换线程的目的,由于事件又是自上而下的,所以每次切换都能在下游事件中感受到线程的变化。

日志分析

subscribeOn()observeOn()放一起来说不太容易说明白其中的线程变换,我先看看单独使用其中的一个操作符的时候,导致的线程变化。

仅调用 subscribeOn() 调度线程

Observable.just("Data")
                .map {
                    Log.d("Map 1", Thread.currentThread().name)
                    return@map it
                }
                .subscribeOn(Schedulers.io()) 
                .doOnSubscribe {
                    Log.d("doOnSubscribe 1 ", Thread.currentThread().name)
                }
                .map {
                    Log.d("Map 2 ", Thread.currentThread().name)
                    return@map it
                }
                .subscribeOn(Schedulers.newThread())
                .doOnSubscribe {
                    Log.d("doOnSubscribe 2 ", Thread.currentThread().name)
                }
                .map {
                    Log.d("Map 3 ", Thread.currentThread().name)
                    return@map it
                }
                .subscribe(object : Observer<String> {
                    override fun onComplete() {

                    }

                    override fun onSubscribe(d: Disposable) {
                        Log.d("onSubscribe", Thread.currentThread().name)
                    }

                    override fun onNext(t: String) {
                        Log.d("onNext", Thread.currentThread().name)
                    }

                    override fun onError(e: Throwable) {
                        e.printStackTrace()
                    }

                })

执行结果:

image.png

从日志可以看出:

  • 1、订阅是自下向上的(onSubscribe -->doOnSubscribe 2 -->doOnsubscribe 1)
  • 2、自下向上看,每次调用 subscribeOn 订阅线程将会发生改变,直到下次调用 subscribeOn
  • 3、事件是自上向下传递的(Map 1 --> Map 2 --> Map 3 --> onNext),且所在线程为最后一次线程切换后所在的线程 RxCachedThreadScheduler-1

仅调用 subscribeOn() 调度线程

        Observable.just("Data")
                .map {
                    Log.d("Map 1", Thread.currentThread().name)
                    return@map it
                }
//                .doOnSubscribe {
//                    Log.d("doOnSubscribe 1 ", Thread.currentThread().name)
//                }
//                .subscribeOn(Schedulers.io())
                .observeOn(Schedulers.newThread())
                .map {
                    Log.d("Map 2 ", Thread.currentThread().name)
                    return@map it
                }
//                .doOnSubscribe {
//                    Log.d("doOnSubscribe 2 ", Thread.currentThread().name)
//                }
//                .subscribeOn(Schedulers.newThread())
                .observeOn(Schedulers.newThread())
                .map {
                    Log.d("Map 3 ", Thread.currentThread().name)
                    return@map it
                }
                .subscribe(object : Observer<String> {
                    override fun onComplete() {

                    }

                    override fun onSubscribe(d: Disposable) {
                        Log.d("onSubscribe", Thread.currentThread().name)
                    }

                    override fun onNext(t: String) {
                        Log.d("onNext", Thread.currentThread().name)
                    }

                    override fun onError(e: Throwable) {
                        e.printStackTrace()
                    }

                })

执行结果:

日志打印

从日志可以看出:

  • 1、事件发送是正常的自上向下(Map 1 --> Map 2 --> Map 3 --> onNex)
  • 2、自上向下,每次调用 observeOn 观察结果回调线程都将切换一次(main -->RxNewThreadScheduler-1 -->RxNewThreadScheduler-2)

混合使用调度线程

我们把上述代码中注释部分都打开,得到的日志如下:

日志打印

通过上面的三次日志打印我们可以看出:

订阅链的日志自下而上打印完毕后,再自上而下打印观察结果。subscribeOn 会切换线程,并不是像有的文章所说只有第一次指定线程(即自下而上的最后一次)有效。第一次有效只是我们的错觉,因为订阅是自下而上的,不管前面的线程怎样切换追踪都会切换到 subscribeOn第一次指定线程(即自下而上的最后一次)。我们在回调结果中未进行线程切换操作时,只能感知到这一次线程切换 (Map1 与 doOnSubscribe 1 所在线程一致)。observeOn的每次指定线程都会让事件流切换到对应的线程中去。完整的事件订阅和发送流程如下图所示,从我们调用 subscribe()将观察者和观察对象关联起来开始,subscribe() 中传入的 Observer 的 onNextonError结束,形成了一个逆时针的 n 形的链条。右边部分的观察链中,每次 subscribeOn 都会切换观察线程。左边部分的事件发送链,会从观察链的最后一次指定的线程开始发送事件,每次调用 observeOn都会指定新的事件发送线程。

图解

参照上面的源码和日志分析,再结合本图相信大家会对 RxJava 的现场调度有一个更立体的认识

RxJava2 线程切换流程
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 219,110评论 6 508
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,443评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 165,474评论 0 356
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,881评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,902评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,698评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,418评论 3 419
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,332评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,796评论 1 316
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,968评论 3 337
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,110评论 1 351
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,792评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,455评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,003评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,130评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,348评论 3 373
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,047评论 2 355

推荐阅读更多精彩内容

  • 在正文开始之前的最后,放上GitHub链接和引入依赖的gradle代码: Github: https://gith...
    苏苏说zz阅读 678评论 0 2
  • 来自于:CSDNblog.csdn.net/caihongdao123/article/details/51897...
    于加泽阅读 1,283评论 0 5
  • 在正文开始之前的最后,放上 GitHub 链接和引入依赖的 gradle 代码: Github: https://...
    松江野人阅读 5,896评论 0 1
  • 做事贵乎有组织、有步骤,即有既定之程序。想成功,必要有组织的智慧,大小事,一也。
    其实很想说阅读 120评论 0 0
  • 玩弄男人于股掌之间。 拥有颠倒众生的美丽资本。 个性解放和追求自由的精神状态。 你这个妖女。听上去完全可以把石化作...
    晴天_ebfd阅读 422评论 0 0