RxJava:线程切换

上一篇:RxJava:基本订阅流程

我们在Rxjava中最常用的两个方法:

  • subscribeOn(Scheduler scheduler)
  • observeOn(Scheduler scheduler)

通常,我们使用subscribeOn(Schedulers.io())方法指定在IO线程中订阅-----进行数据处理,observeOn(AndroidSchedulers.mainThread())方法指定在主线程中观察-----进行UI操作。

so,Rxjava是如何进行线程切换的?

Observable.subscribeOn(Scheduler scheduler)

    public final Observable<T> subscribeOn(Scheduler scheduler) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));//1
    }

​ 可以看到,subscribeOn()方法与Observable.create()类似,都返回了一个Observable<T>对象,其内部主要调用了ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler)方法,构造了一个ObservableSubscribeOn对象。ObservableSubscribeOn类不长,我们贴出它的全部代码,如下:

public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;

    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {//2
        super(source);
        this.scheduler = scheduler;
    }

    @Override
    public void subscribeActual(final Observer<? super T> s) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);

        s.onSubscribe(parent);
        //Disposable用来做资源处理,在这里我们不用关心   
        parent.setDisposable(scheduler.scheduleDirect(new Runnable() {//3
            @Override
            public void run() {
                source.subscribe(parent);
            }
        }));
    }

    static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
        private static final long serialVersionUID = 8094547886072529208L;
        final Observer<? super T> actual;

        final AtomicReference<Disposable> s;

        SubscribeOnObserver(Observer<? super T> actual) {
            this.actual = actual;
            this.s = new AtomicReference<Disposable>();
        }

        @Override
        public void onSubscribe(Disposable s) {
            DisposableHelper.setOnce(this.s, s);
        }

        @Override
        public void onNext(T t) {
            actual.onNext(t);
        }

        @Override
        public void onError(Throwable t) {
            actual.onError(t);
        }

        @Override
        public void onComplete() {
            actual.onComplete();
        }

        @Override
        public void dispose() {
            DisposableHelper.dispose(s);
            DisposableHelper.dispose(this);
        }

        @Override
        public boolean isDisposed() {
            return DisposableHelper.isDisposed(get());
        }

        void setDisposable(Disposable d) {
            DisposableHelper.setOnce(this, d);
        }
    }
}

​ ObservableSubscribeOn类继承关系如下:

ObservableSubscribeOn<T> extends AbstractObservableWithUpstream
bstractObservableWithUpstream<T, U> extends Observable<U>

​ ObservableSubscribeOn类也是一个Observable,从名字上也可以看出,这是一个进行了SubscribeOn的Observable。那么,他和我们上一篇提到的ObservableCreate类最大的区别在哪里?

​ 再贴出Observable实现类中的关键方法:ObservableCreate.subscribeActual(Observer<? super T> observer),做一个对比:

    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }
    • ObservableCreate内部定义了发射器类-----CreateEmitter,observer.onSubscribe()方法传入的是CreateEmitter发射器对象。

    • ObservableSubscribeOn内部定义了一个SubscribeOnObserver对象,subscribeActual方法中,s.onSubscribe()传入的是这个SubscribeOnObserver对象。即对Observer做一次重新包装,放入一个Runnable中执行。

      SubscribeOnObserver也是一个Observer,将我们传入的Observeable进行了包装

    • ObservableCreate.subscribeActual中,执行了source.subscribe(parent)即使用发射器执行我们定义的subscribe(ObservableEmitter<Bundle> e)方法;

    • ObservableSubscribeOn.subscribeActual中,调用的是scheduler.scheduleDirect(Ruunable run),传入一个Runnable,来执行source.subscribe(parent)

      我们使用发射器发射的数据,是通过subscribe()调用的,而subscribe()真正被调用的,是在Observable实现类ObservableSubscribeOn.subscribeActual()。

我们主要看一下关键逻辑步骤:scheduler.scheduleDirect()方法,了解Runnable是如何在我们指定线程中执行的。

    public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
        final Worker w = createWorker();

        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

        w.schedule(new Runnable() {
            @Override
            public void run() {
                try {
                    decoratedRun.run();
                } finally {
                    w.dispose();
                }
            }
        }, delay, unit);

        return w;
    }

​ 我们可以看到,其内部调用的了Worker.schedule()方法,来实际执行我们传入的Runnable。而createWorker()是一个抽象方法,其具体实现由我们传入的Scheduler的实现类决定,即我们调用subscribeOn(Scheduler scheduler)中传入的Scheduler。在上文代码中,我用注释//1 //2 //3标明了这条引用链。

​ 我们就以Schedulers.io()为例:我们具体看下它的实现。我们先来看一下Schedulers.io()所代表的Scheduler:

    public static Scheduler io() {
        return RxJavaPlugins.onIoScheduler(IO);
    }
    static {
    ......
        IO = RxJavaPlugins.initIoScheduler(new Callable<Scheduler>() {
                @Override
                public Scheduler call() throws Exception {
                    return IoHolder.DEFAULT;
                }
            });
    ......
    }
    static final class IoHolder {
        static final Scheduler DEFAULT = new IoScheduler();
    }

Schedulers.io()返回的是一个Scheduler对象。在IoScheduler类中,createWorker()返回了一个EventLoopWorker对象,EventLoopWorker.schedule方法如下:

        public Disposable schedule(Runnable action, long delayTime, TimeUnit unit) {
            if (tasks.isDisposed()) {
                // don't schedule, we are unsubscribed
                return EmptyDisposable.INSTANCE;
            }

            return threadWorker.scheduleActual(action, delayTime, unit, tasks);
        }

​ scheduleActual方法中,对Runnable进行了一系列封装,最终使用的是一个Android的一个ExecutorService实现类----ScheduledExecutorService来执行。

public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, TimeUnit unit, DisposableContainer parent) {
    Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

    ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);

    if (parent != null) {
        if (!parent.add(sr)) {
            return sr;
        }
    }

    Future<?> f;
    try {
        if (delayTime <= 0) {
            f = executor.submit((Callable<Object>)sr);
        } else {
            f = executor.schedule((Callable<Object>)sr, delayTime, unit);
        }
        sr.setFuture(f);
    } catch (RejectedExecutionException ex) {
        parent.remove(sr);
        RxJavaPlugins.onError(ex);
    }

    return sr;
}

​ 这个executor构造如下:

public NewThreadWorker(ThreadFactory threadFactory) {
    executor = SchedulerPoolFactory.create(threadFactory);
}
    public static ScheduledExecutorService create(ThreadFactory factory) {
        final ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, factory);
        if (exec instanceof ScheduledThreadPoolExecutor) {
            ScheduledThreadPoolExecutor e = (ScheduledThreadPoolExecutor) exec;
            POOLS.put(e, exec);
        }
        return exec;
    }

​ 我们从线程池中获取一个ExecutorService,使用该ExecutorService执行我们传入的Runnable。总结以上:Observable.observeOn(Schedulers.io()) 切换子线程的原理在于:ObservableOnSubscribe中自定义的subscribe(),实际被传入的Scheduler实现类的Worker.schedule()方法指定线程中执行

Observable. observeOn(Scheduler scheduler)

    public final Observable<T> observeOn(Scheduler scheduler) {
        return observeOn(scheduler, false, bufferSize());
    }
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
    ObjectHelper.requireNonNull(scheduler, "scheduler is null");
    ObjectHelper.verifyPositive(bufferSize, "bufferSize");
    return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}

​ observeOn()是一个final方法,所以所有的Observeable都是调用的该方法,它实际上返回的是一个ObservableObserveOn对象实例。在切换完线程后使用subscribe()方法定义订阅观察者进行ui操作,在上一篇已经讲到,Observeable.subscribe实际执行的是Observable实现类的subscribeActual方法,所以,我们需要关注的是ObservbleObserveOn.subscribeActual()

ObservableObserveOn.subscribeActual()代码如下:

protected void subscribeActual(Observer<? super T> observer) {
    //如果传入的scheduler指代当前线程,不进行线程切换,直接调用原observer的subscribe()方法,此处的      //source是上游的ObservableSource(Observable)
    if (scheduler instanceof TrampolineScheduler) {
        source.subscribe(observer);
    } else {
        Scheduler.Worker w = scheduler.createWorker();

        source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
    }
}

​ 这里重点关注的是else中的线程切换流程,和subscribeOn()方法进行线程切换类似,也是由传入的Scheduler决定Worker。这里引入ObserveOnObserver类,传入的observer进行封装,即决定observer的运行线程。

​ ObserveOnObserver类实现了Runnable接口,run方法如下:

//ObserveOnObserver实现的run方法,drainFused和drainNormal中执行了Observer的回调方法。
public void run() {
    if (outputFused) {
        drainFused();
    } else {
        drainNormal();
    }
}

​ ObserveOnObserver的worker主要作用于schedule()方法,而schedule()如下,即调用worker.schedule()方法,执行当前ObserveOnObserver。

void schedule() {
    if (getAndIncrement() == 0) {
        worker.schedule(this);
    }
}

​ So,observeOn方法也是通过Scheduler来实现线程切换的。而AndroidSchedulers.mainThread()所代表的Scheduler如下:

static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));

​ 在schedle()方法中,我们看到是通过handler,将传入的Runnable发送到主线程执行。

HandlerWorker(Handler handler) {
    this.handler = handler;
}

@Override
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
    if (run == null) throw new NullPointerException("run == null");
    if (unit == null) throw new NullPointerException("unit == null");

    if (disposed) {
        return Disposables.disposed();
    }

    run = RxJavaPlugins.onSchedule(run);
    //将传入的Runnable和handler封装成一个ScheduledRunnable,作为Message的Callback
    ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
    //使用Handler将消息发送MainLooper
    Message message = Message.obtain(handler, scheduled);
    message.obj = this; // Used as token for batch disposal of this worker's runnables.

    handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));

    // Re-check disposed state for removing in case we were racing a call to dispose().
    if (disposed) {
        handler.removeCallbacks(scheduled);
        return Disposables.disposed();
    }

    return scheduled;
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,222评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,455评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,720评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,568评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,696评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,879评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,028评论 3 409
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,773评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,220评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,550评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,697评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,360评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,002评论 3 315
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,782评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,010评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,433评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,587评论 2 350

推荐阅读更多精彩内容

  • 我从去年开始使用 RxJava ,到现在一年多了。今年加入了 Flipboard 后,看到 Flipboard 的...
    Jason_andy阅读 5,460评论 7 62
  • 在上一小节中RxJava2_整体流程分析,有这么一个结论,那就是每一次调用 Observable 的操作符都会返回...
    未见哥哥阅读 1,492评论 0 3
  • 我从去年开始使用 RxJava ,到现在一年多了。今年加入了 Flipboard 后,看到 Flipboard 的...
    huqj阅读 1,842评论 0 21
  • 0.版权声明 本文由玉刚说写作平台提供写作赞助,版权归玉刚说微信公众号所有原作者:四月葡萄版权声明:未经玉刚说许可...
    四月葡萄阅读 744评论 0 3
  • 前言 通过前一篇的从观察者模式出发,聊聊RxJava,我们大致理解了RxJava的实现原理,在RxJava中可以非...
    CuiTao阅读 566评论 0 4