RxJava 线程切换之subscribeOn源码分析

首先看下我们RxJava的常规使用方法

代码A 调用类

Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(ObservableEmitter<String> e) throws Exception {
        e.onNext(XXX);
        e.onComplete();
    }
}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Observer<String>() {
    @Override
    public void onSubscribe(Disposable d) {
    }

    @Override
    public void onNext(String s) {
        Log.e("tag", s);
    }

    @Override
    public void onError(Throwable e) {
    }

    @Override
    public void onComplete() {
    }

});

1、Schedulers.io()为IoScheduler

下面看下subscribeOn(Schedulers.io())这个方法,把代码贴出来

代码B ObservableCreate类

public final Observable<T> subscribeOn(Scheduler scheduler) {
    ObjectHelper.requireNonNull(scheduler, "scheduler is null");
    return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}

最后真正执行的是ObservableSubscribeOn类中的subscribeActual方法

代码C ObservableSubscribeOn类

@Override

public void subscribeActual(final Observer<? super T> s) {
    final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
    s.onSubscribe(parent);
    parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}

形参 s 为观察者,在外层执行Observable.subscribe(Observer)之后,观察者的onSubscribe()方法是首先会被调用的,调用位置便在 代码C 中的s.onSubscribe(parent)。

重点来看一下 代码C 中的最后一行代码。首先看下SubscribeTask类。

代码D ObservableSubscribeOn类

final class SubscribeTask implements Runnable {
    private final SubscribeOnObserver<T> parent;

    SubscribeTask(SubscribeOnObserver<T> parent) {
        this.parent = parent;
    }

    @Override
    public void run() {
        source.subscribe(parent);
    }
}

实际上SubscribeTask就是一个Runnable类,在其run方法中,执行了source.subscribe(parent);其中source就是我们 代码B 中new ObservableSubscribeOn<T>(this, scheduler)传入的this,在这里也就是ObservableCreate类,parent就是 代码C 中s的包装类,在这里可以看成是观察者类。

接着看下 代码C 中的 scheduler.scheduleDirect(new SubscribeTask(parent))

//scheduleDirect源码如下

代码E Scheduler类(IoScheduler)

public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
    final Worker w = createWorker();
    final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
    DisposeTask task = new DisposeTask(decoratedRun, w);
    w.schedule(task, delay, unit);
    return task;
}

createWorker() 方法是一个抽象方法,IoScheduler类的具体实现如下,new了一个 EventLoopWorker
public Worker createWorker() {
    return new EventLoopWorker(pool.get());
}

pool.get()是从线程池CachedWorkerPool中取一个线程NewThreadWorker

重点是在 代码E中的w.schedule(task, delay, unit),即eventLoopWorker.schedule方法,一直跟下去,到最终调用处如下

代码F NewThreadWorker类

public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {

    Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
    ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
    if (parent != null) {
        if (!parent.add(sr)) {
            return sr;
        }
    }

    Future<?> f;
    try {
        if (delayTime <= 0) {
            f = executor.submit((Callable<Object>)sr);
        } else {
            f = executor.schedule((Callable<Object>)sr, delayTime, unit);
        }
        sr.setFuture(f);
    } catch (RejectedExecutionException ex) {
        if (parent != null) {
            parent.remove(sr);
        }
        RxJavaPlugins.onError(ex);
    }

    return sr;
}

因为我们也没有设置delay时间,所以我们重点看下executor.submit((Callable<Object>)sr);

经过层层传递,其中传入的参数sr即为 代码C 中的SubscribeTask。此时此刻,基本上都清晰了,在这里执行submit放法,实际上就是执行我们 代码D 中的source.subscribe(parent);由于此处是在子线程中调用,所以能满足在最外层调用
subscribeOn(Schedulers.io())之前的上游代码都运行在子线程中。

说到上游,我们回到代码A。ObservableSubscribeOn的上游便是我们通过方法Observable.create()创建的ObservableCreate类。

代码D 中的source.subscribe(parent)便相当于直接调到了ObservableCreate类的subscribe()方法。相同的套路,最终都会调到ObservableCreate类的subscribeActual()方法。

代码G ObservableCreate类

@Override

protected void subscribeActual(Observer<? super T> observer) {

    CreateEmitter<T> parent = new CreateEmitter<T>(observer);
    observer.onSubscribe(parent);
    try {
        source.subscribe(parent);
    } catch (Throwable ex) {
        Exceptions.throwIfFatal(ex);
        parent.onError(ex);
    }
}

source.subscribe(parent)中的source即为 代码A 中Observable.create(new ObservableOnSubscribe<String>())方法传入的ObservableOnSubscribe类。调用source.subscribe()方法,即调用 ObservableOnSubscribe 类中的 subscribe() 方法。

ObservableOnSubscribe为一个接口,代码如下:

public interface ObservableOnSubscribe<T> {

    /**
    * Called for each Observer that subscribes.
    * @param e the safe emitter instance, never null
    * @throws Exception on error
    */
    void subscribe(@NonNull ObservableEmitter<T> e) throws Exception;

}

这下便回到了我们熟悉的外部调用。其中形参 e 便为 代码G 中第9行传入的parent,即observer的包装类。

e.onNext(XXX)方法的分析请看下一篇文章RxJava线程切换之observeOn源码分析

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,922评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,591评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,546评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,467评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,553评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,580评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,588评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,334评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,780评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,092评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,270评论 1 344
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,925评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,573评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,194评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,437评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,154评论 2 366
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,127评论 2 352

推荐阅读更多精彩内容