理解RxJava(三)线程调度原理分析

概述

在我的上一篇文章 《理解RxJava(二)操作符流程原理分析》 中,分析了依靠多个操作符链式调用的原理。

简单总结如下:

1.创建:订阅前,每一步都生成对应的Observable对象,中间的每一步都将上游的Observable存储;
2.订阅: 每一步都会生成对应的Observer对上一步生成并存储的Observable进行订阅。订阅的执行顺序是由下到上的。
3.执行:先执行每一步传入的函数操作,然后将操作后的数据交给下游的Observer继续处理。 数据的传递和处理顺序是由上到下的。

本文我将尝试对RxJava最核心的 线程调度 的原理进行分析。

基本代码

来看一下基本代码:

    @Test
    public void test() throws Exception {
        Observable.create((ObservableOnSubscribe<Integer>) e -> {
            e.onNext(1);
            e.onNext(2);
            e.onComplete();
        }).subscribeOn(Schedulers.io())
          .observeOn(AndroidSchedulers.mainThread())
          .subscribe(i -> System.out.println("onNext : i= " + i));
    }

很简单,即订阅时将task交给子线程去做,而数据的回调则在Android主线程中执行。

一、subscribeOn()

点击查看源码:

    public final Observable<T> subscribeOn(Scheduler scheduler) {
        //非空判断和hook
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }

有了前两篇文章的基础,我们很清楚,排除掉非空判断和hook相关逻辑,实际上这个方法返回了一个ObservableSubscribeOn对象。

我们有理由猜测这个ObservableSubscribeOn应该和上文的ObservableMap及ObservableDoOnEach相似,都是Observable的一个包装类(装饰器):

//1.和上文基本一样,ObservableSubscribeOn也是Observable的一个装饰器
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;

    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
       //2.存储上游的ObservableSource和调度器
        super(source);
        this.scheduler = scheduler;
    }

    @Override
    public void subscribeActual(final Observer<? super T> s) {
        //3.new 一个SubscribeOnObserver
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);

        //4.回调方法,这说明下游的onSubscribe回调方法所在线程和线程调度无关
        //  是订阅时所在的线程
        s.onSubscribe(parent);

        //5.立即执行线程调度
        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }
}

前两步我们不需要 再多解释,直接看第三点,我们看看SubscribeOnObserver这个类:

SubscribeOnObserver

    static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {

        private static final long serialVersionUID = 8094547886072529208L;
        //下游的Observer
        final Observer<? super T> actual;
        //保存上游的Disposable,自身dispose时,连同上游一起dispose
        final AtomicReference<Disposable> s;

        SubscribeOnObserver(Observer<? super T> actual) {
            this.actual = actual;
            this.s = new AtomicReference<Disposable>();
        }

        @Override
        public void onSubscribe(Disposable s) {
            DisposableHelper.setOnce(this.s, s);
        }

        @Override
        public void onNext(T t) {
            actual.onNext(t);
        }

        @Override
        public void onError(Throwable t) {
            actual.onError(t);
        }

        @Override
        public void onComplete() {
            actual.onComplete();
        }

        @Override
        public void dispose() {
            DisposableHelper.dispose(s);
            DisposableHelper.dispose(this);
        }

类似Observable和ObservableMap,SubscribeOnObserver同样是DisposableObserver的一个装饰器,提供了对下游数据的传递,以及将task dispose的接口。

第4步我们之前就讲过了,直接看第5步:

         //5.立即执行线程调度
        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));

我们看看SubscribeTask这个类:

SubscribeTask

    final class SubscribeTask implements Runnable {
        private final SubscribeOnObserver<T> parent;

        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }

        @Override
        public void run() {
            source.subscribe(parent);
        }
    }

难以置信的简单,SubscribeTask 仅仅是一个Runnable 接口的实现类而已,通过将SubscribeOnObserver作为参数存起来,在run()方法中添加了上游Observable的被订阅事件,就没有了别的操作,

接下来我们看一下scheduler.scheduleDirect(SubscribeTask)中的代码:

public abstract class Scheduler {
    //...
    public Disposable scheduleDirect(@NonNull Runnable run) {
        return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
    }

    public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
        // Worker 本身就是Disposable 的实现类
        // 请注意, createWorker()所创建的worker,
        // 实际就是Schdulers.io()所提供的IoScheduler所创建的worker
        final Worker w = createWorker();
        
        //hook相关
        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

        DisposeTask task = new DisposeTask(decoratedRun, w);
        
        //即 worker.schedule(task, 0, TimeUnit.NANOSECONDS): 立即执行task
        w.schedule(task, delay, unit);

        return task;
    }
    //...
}

我们不要追究过深,我们看一下这个createWorker方法的注释说明:

    /**
     * Retrieves or creates a new {@link Scheduler.Worker} that represents serial execution of actions.
     * 检索或创建一个新的{@link Scheduler.Worker}表示一系列的action
     * 
     * When work is completed it should be unsubscribed using {@link Scheduler.Worker#dispose()}.
     * 当work完成后,应使用{@link Scheduler.Worker#dispose()}取消订阅。
     * 
     * Work on a {@link Scheduler.Worker} is guaranteed to be sequential.
     *  {@link Scheduler.Worker} 上面的work保证是顺序执行的
     */

现在我们知道了:

我们通过调用subscribeOn()传入Scheduler,当下游ObservableSource被订阅时(请注意,订阅顺序是由下到上的),距离最近的线程调度subscribeOn()方法中,保存的Scheduler会创建一个worker(对应相应的线程,本文中为IoScheduler),在其对应的线程中,立即执行task

关于对应的Worker相关和IoScheduler相关,篇幅所限,本文不做细讲,有兴趣的同学可以自行研究。

多次subscribeOn()

现在考虑一个问题,假如在我们的代码中,多次使用了subscribeOn()代码,到线程会怎么处理呢?

上文已经讲到了,不管我们怎么通过subscribeOn()方法切换线程,由于订阅执行顺序是由下到上,因此当最上游的ObservableSource被订阅时,所在线程当然是距离上游最近的subscribeOn()所提供的线程,即最终Observable总是在第一个subscribeOn()所在的线程中执行

二、observeOn()

先看observeOn()内部,果然是hook+Observable的包装类:

    public final Observable<T> observeOn(Scheduler scheduler) {
        return observeOn(scheduler, false, bufferSize());
    }

    public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");

        //实例化ObservableObserveOn对象并返回
        return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
    }

再看ObservableObserveOn:

public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;
    final boolean delayError;
    final int bufferSize;
    public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
        super(source);
        //1.相关依赖注入
        this.scheduler = scheduler;
        this.delayError = delayError;
        this.bufferSize = bufferSize;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            //2.创建主线程的worker
            Scheduler.Worker w = scheduler.createWorker();
            //3.上游数据源被订阅
            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }
}

和subscribeOn()不同的是,我们并不是立即在对应的线程执行task,而是将对应的线程(实际上是worker)作为参数,实例化ObserveOnObserver并存储起来。

当上游的数据传递过来时,ObserveOnObserver执行对应的方法,比如onNext(T),再切换到对应线程中,并交由下游的Observer去接收:

ObserveOnObserver

ObserveOnObserver中代码极多,我们简单了解原理后,以onNext(T)为例:

static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
    implements Observer<T>, Runnable {

        //...省略其他代码
        ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
            this.actual = actual;
            this.worker = worker;
            this.delayError = delayError;
            this.bufferSize = bufferSize;
        }
        //队列
        SimpleQueue<T> queue;

       @Override
        public void onNext(T t) {
            if (done) {
                return;
            }
            //将数据存入队列
            if (sourceMode != QueueDisposable.ASYNC) {
                queue.offer(t);
            } 
            //对应线程取出数据并交由下游的Observer
            schedule();
        }

        void schedule() {
            if (getAndIncrement() == 0) {
                worker.schedule(this);
            }
        }
         //...省略其他代码
}

多次observerOn()

由上文得知,与subscribeOn()相反,observerOn()操作会将切换到对应的线程,然后交由下游的Observer处理,因此observerOn()仅对下游的Observer生效,并且,如果多次调用,observerOn()的线程调度会持续到下一个observerOn()操作之前

我在《RxJava(11-线程调度Scheduler)》 @open-Xu文章中看到了这张图片,完美诠释了observerOn()的原理:

observerOn()的原理

注意:

接下来的内容都是@open-Xu在他的文章《RxJava(11-线程调度Scheduler)》中总结的笔记,为了方便以后翻阅,特此转载过来,下文的转载请@原作者open-Xu并注明原文地址,谢谢!

调度器的种类

调度器类型 效果
Schedulers.computation( ) 用于计算任务,如事件循环或和回调处理,不要用于IO操作(IO操作请使用Schedulers.io());默认线程数等于处理器的数量
Schedulers.from(executor) 使用指定的Executor作为调度器
Schedulers.immediate( ) 在当前线程立即开始执行任务
Schedulers.newThread( ) 为每个任务创建一个新线程
Schedulers.io( ) 用于IO密集型任务,如异步阻塞IO操作,这个调度器的线程池会根据需要增长;对于普通的计算任务,请使用Schedulers.computation();Schedulers.io( )默认是一个CachedThreadScheduler,很像一个有线程缓存的新线程调度器
Schedulers.trampoline( ) 当其它排队的任务完成后,在当前线程排队开始执行
AndroidSchedulers.mainThread( ) 主线程,UI线程,可以用于更新界面

总结

subscribeOn()

  • 订阅顺序当从下到上,上游的ObservableSource被订阅时,先切换线程,然后立即执行task;
  • 当存在多个subscribeOn()方法时,仅第一个subscribeOn()有效

observerOn()

  • 订阅顺序当从下到上,上游的ObservableSource被订阅时,会将对应的worker创建并作为构造参数存储在Observer的装饰器中,并不会立即切换线程
  • 当数据由上游发送过来时,先将数据存储到队列中,然后切换线程,然后在新的线程中将数据发送给下游的Observer
  • 当存在多个observerOn()方法时,仅对距下游下一个observerOn()之前的observer有效

参考文章

1.《RxJava(11-线程调度Scheduler)》 @open-Xu

2.《RxJava2 源码解析(二)》@张旭童

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 222,183评论 6 516
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 94,850评论 3 399
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 168,766评论 0 361
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 59,854评论 1 299
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,871评论 6 398
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 52,457评论 1 311
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,999评论 3 422
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,914评论 0 277
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 46,465评论 1 319
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,543评论 3 342
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,675评论 1 353
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 36,354评论 5 351
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 42,029评论 3 335
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,514评论 0 25
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,616评论 1 274
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 49,091评论 3 378
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,685评论 2 360

推荐阅读更多精彩内容

  • 我从去年开始使用 RxJava ,到现在一年多了。今年加入了 Flipboard 后,看到 Flipboard 的...
    Jason_andy阅读 5,493评论 7 62
  • 最近项目里面有用到Rxjava框架,感觉很强大的巨作,所以在网上搜了很多相关文章,发现一片文章很不错,今天把这篇文...
    Scus阅读 6,884评论 2 50
  • 原文地址:http://gank.io/post/560e15be2dca930e00da1083 前言 我从去年...
    AFinalStone阅读 2,197评论 5 23
  • 前言 终究没有经受住RxJava的诱惑,只恨自己来的比较晚,走起~ RxJava 是什么? 一个在 Java VM...
    王永迪阅读 4,280评论 3 37
  • 作者寄语 很久之前就想写一个专题,专写Android开发框架,专题的名字叫 XXX 从入门到放弃 ,沉淀了这么久,...
    戴定康阅读 7,634评论 13 85