RxJava2 源码解析——线程调度 Scheduler

RxJava源码解析第二篇。
我们知道,在使用RxJava的时候,线程的调度是其内部帮我们实现的,这让我们可以便捷的实现函数式编程。
本文主要从源码的角度来分析RxJava的线程调度机制
= =最近被项目搞疯都没什么时间写笔记了。


引入

我们知道,线程调度主要通过observeOnsubscribeOn这两个方法,以及Schedular来指定使用的线程。
还是以上一次的代码为例:

Observable.create(new ObservableOnSubscribe<LoginApiResult>() {
        @Override
        public void subscribe(ObservableEmitter<LoginApiResult> e) throws Exception {
            e.onNext(login());
        }
    }) //调用登录接口
    .map(new Function<LoginApiBean, UserInfoBean>() {
        @Override
        protected UserInfoBean decode(LoginApiBean loginApiBean) {
            //处理登录结果,返回UserInfo
            if (loginApiBean.isSuccess()) {
                return loginApiBean.getUserInfoBean();
            } else {
                throw new RequestFailException("获取网络请求失败");
            }
        }
    })
    .doOnNext(new Consumer<UserInfoBean>() {    //保存登录结果UserInfo
        @Override
        public void accept(@NonNull UserInfoBean bean) throws Exception {
            saveUserInfo(bean);
        }
    })
    .subscribeOn(Schedulers.io())   //调度线程
    .observeOn(AndroidSchedulers.mainThread())  //调度线程
    .subscribe(new Consumer<UserInfoBean>() {
        @Override
        public void accept(@NonNull UserInfoBean bean) throws Exception {LoginApiBean
            //整个请求成功,根据获取的UserInfo更新对应的View
            showSuccessView(bean);
        }
    }, new Consumer<Throwable>() {
        @Override
        public void accept(@NonNull Throwable throwable) throws Exception {
            //请求失败,显示对应的View
            showFailView();
        }
    });

我们知道,通过:

.subscribeOn(Schedulers.io())   //调度线程
.observeOn(AndroidSchedulers.mainThread())  //调度线程

这两句代码,就使我们上半部分的请求和保存数据都执行在io线程中,而下半部的ui更新则执行在主线程。

通过这段代码,我们引入几个问题:

  1. observeOn和subscribeOn是如何实现线程调度的?
  2. observeOn和subscribeOn之间是否存在冲突?

observeOn源码

首先解决第一个问题,我们先了解一下ObserveOn的实现原理:

首先看一下调用:

public final Observable<T> observeOn(Scheduler scheduler) {
    return observeOn(scheduler, false, bufferSize());
}

public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
    return new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize);
}

我们可以看到,ObserveOn最终是返回了一个ObservableObserveOn对象,并将scheduler传入。

根据上一篇文的思路:


任务链.png

ObservableObserveOn会被我们最后subscribe的时候传入的Observer订阅。

让我们跟进看一下ObservableObserveOn被订阅时会执行什么逻辑:

@Override
protected void subscribeActual(Observer<? super T> observer) {
    //TrampolineScheduler 表示当前线程
    if (scheduler instanceof TrampolineScheduler) {
        source.subscribe(observer);
    } else {
        //根据scheduler创建worker
        Scheduler.Worker w = scheduler.createWorker();
        //通过ObservableObserveOnObserver代理
        source.subscribe(new ObservableObserveOn.ObserveOnObserver<T>(observer, w, delayError, bufferSize));
    }
}

这里的逻辑并不难理解,(如果看了上一篇文章),

首先是判断了scheduler是不是表示当前线程的TrampolineScheduler,如果是就直接让observer订阅上一级的Observable,也就是跳过当前这一层,即图中的Observer直接订阅ObservableSubscribeOn

然后根据schedular生成对应的worker,交由ObservableObserveOnObserver代理,订阅上一级的Observable

根据我们引入的案例,我们以observeOn(AndroidSchedulers.mainThread()) 为例,当完成逆向订阅,执行任务链到ObservableObserveOnObserver时:

@Override
public void onNext(T t) {
    // 上一级的模式如果不是异步的,加入队列
    if (sourceMode != QueueDisposable.ASYNC) {
        queue.offer(t);
    }
    //进行线程调度
    schedule();
}

void schedule() {
    // 判断当前正在执行的任务数目
    if (getAndIncrement() == 0) {
        worker.schedule(this);
    }
}

这里首先是判断了sourceMode,这里先不跟踪这个变量,只需要知道大多数情况下,这个判断是成立,所以会把数据加入队列。

然后转而让worker执行接下去的步骤。

我们跟踪看看,可以发现,这是个抽象方法,可以找到他在不同类中有不同实现,分别对应了几种不同的线程调度机制,我们挑选案例中的AndroidSchedulers.mainThread()来跟踪。

首先我们跟踪mainThread方法,可以发现内部转到了这里:

static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));

我们再跟进HandlerScheduler,我们知道worker是通过createWorker方法产生的:

public Worker createWorker() {
   return new HandlerWorker(handler);
}

可以看到直接生成了HandlerWorker,并传入了一开始创建的绑定了MainLooperHandler。看到这里也能大致猜出,后续会把任务传给这个handler执行:

@Override
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
    //省略部分代码
    ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);

    Message message = Message.obtain(handler, scheduled);
    message.obj = this; // Used as token for batch disposal of this worker's runnables.

    handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));

    return scheduled;
}

可以看到,这里将传进来的runnable包装成ScheduledRunnable,然后提交给绑定的handler

我们知道,后续Handler会调用ScheduledRunnable的run方法:

ScheduledRunnable(Handler handler, Runnable delegate) {
    this.handler = handler;
    this.delegate = delegate;
}

@Override
public void run() {
    try {
        delegate.run();
    } catch (Throwable t) {
        //……
    }
}

可以看到,只是简单的调用了我们传入的runnablerun方法,也就是刚才我们在ObservableObserveOnObserver中通过schedule方法传入的runnable,我们回去看看:

void schedule() {
    // 判断当前正在执行的任务数目
    if (getAndIncrement() == 0) {
        worker.schedule(this);
    }
}

可以看到其实本身就是个runnable

@Override
public void run() {
    //输出结果是否融合
    if (outputFused) {
        drainFused();
    } else {
        drainNormal();
    }
}

可以看到,根据outputFused来跳转方法,这里先不跟踪这个变量,后面会再提到。
现在只需要知道当连续两个observable都需要线程调度时(比如从observeOnobserveOn),这个outputFused才会发生变化,默认为false。

那么这里,我们先进入drainNormal方法:

void drainNormal() {
    int missed = 1;
    final SimpleQueue<T> q = queue;
    final Observer<? super T> a = actual;
    //第一层循环
    for (;;) {
        // 检查异常处理
        if (checkTerminated(done, q.isEmpty(), a)) {
            return;
        }
        //第二层循环
        for (;;) {
            boolean d = done;
            T v;
            //从队列中获取数据
            v = q.poll();
            boolean empty = v == null;
            // 检查异常
            if (checkTerminated(d, empty, a)) {
                return;
            }
            //如果没有数据了,跳出
            if (empty) {
                break;
            }
            //执行下一次操作。
            a.onNext(v);
        }
        //减掉执行的次数,并获取剩于任务数量,然后再次循环
        //直到获取剩余任务量为0,跳出循环
        missed = addAndGet(-missed);
        if (missed == 0) {
            break;
        }
    }
}

这里的逻辑其实也不难,具体可以看注释。

到这里其实已经切换了线程,然后就是分发数据,逐个调用onNext操作了。直到没有数据就跳出循环。(总觉得这里missed的设计很奇怪- -为什么是初始化1而不是missed=get()呢。望有大神解答~)

看到这里也就大致明白了ObserveOn的流程呢。

总结一下:
ObserveOn会用一个queue保存上一级传下来的数据,然后通过scheduler创建一个worker,提交数据,并将任务执行在worker设置的线程中。

subscribeOn源码

看完ObserveOn,我们看一下subscribeOn,
首先看一下当他被订阅时会执行什么操作:

@Override
public void subscribeActual(final Observer<? super T> s) {
    //创建对应的Observer
    final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);

    //执行线程调度,内部会订阅上一级的Observable
    parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}

可以看到,这里直接进行了线程调度,创建了SubscribeTask任务,然后交由Scheduler执行。

我们先看看scheduleDirect会执行什么操作:

@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
    final Scheduler.Worker w = createWorker();

    Scheduler.DisposeTask task = new Scheduler.DisposeTask(run, w);

    w.schedule(task, delay, unit);

    return task;
}

可以看到,这里和我们刚才追踪ObserveOn时的逻辑一样。都是将任务交给了Worker处理。我们刚才已经分析了,Worker会将任务提交给对应的线程执行。

所以我们回过头看一下我们提交了什么任务:

@Override
public void run() {
    source.subscribe(parent);
}

可以看出,这里将订阅的操作提交给了Worker执行。

总结一下:
subscribeOn会将订阅上一级的操作调交给worker中对应的线程执行。

ObserveOn和subscribeOn

我们还是以上述引入的例子为例,可以看出,整个过程进行了两次线程调度,首先是subscribeOn,然后是ObserveOn,这个过程比较简单,先解析这个过程。

根据上一篇文章的分析,RxJava的整个流程分为三个步骤:

  1. 创建任务链,这里没有涉及线程调度。默认执行在当前线程,在这里也就是主线程

  2. 逆向订阅,这里当遇到ObserveOn的时候,ObserveOn直接进行了订阅操作,所以没有影响。
    但是但我们订阅ObservableSubscribeOn的时候,其便将订阅操作提交到了对应线程,所以后续的订阅操作都执行在对应线程,在这里便是IO线程

  3. 执行任务链,受到ObservableSubscribeOn的影响,这里也会继续执行在IO线程
    但是当我们执行到ObserveOnObserver的时候,onNext操作会执行在对应的线程中,在这里也就是切换到主线程

线程调度.png

图中,紫色的箭头表示执行在默认线程(主线程),红色的箭头表示执行在IO线程,绳蓝色的线表示执行在切换后的主线程

observeOn和subscribeOn之间是否存在冲突

其实从上述的例子我们可以看出并不存在冲突的问题,一个影响的subscribe之后的操作,一个影响的是doNext之后的操作。

从图中可以看出,不管subscribeObserveOn怎么变化,都不会发生冲突的情况。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,684评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,143评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,214评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,788评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,796评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,665评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,027评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,679评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,346评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,664评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,766评论 1 331
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,412评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,015评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,974评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,203评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,073评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,501评论 2 343

推荐阅读更多精彩内容