RxJava 2.0 源码分析二(线程切换原理 )

RxJava2线程切换简单用例:

通过subscribeOn()方法指定被观察者的工作线程,以及observeOn()指定观察者的工作线程。

 Observable.create(ObservableOnSubscribe<Int> { e ->
            Log.i("RxJava", "subscribe运行在" + Thread.currentThread())
            e.onNext(0)
            e.onComplete()
        }).subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(object : Observer<Int> {
                    override fun onSubscribe(d: Disposable) {
                        Log.i("RxJava", "onSubscribe运行在" + Thread.currentThread())
                    }

                    override fun onNext(integer: Int) {
                        Log.i("RxJava", "onNext运行在" + Thread.currentThread())
                    }

                    override fun onError(e: Throwable) {}

                    override fun onComplete() {
                        Log.i("RxJava", "onComplete运行在" + Thread.currentThread())
                    }
                })
    }

运行结果:
运行结果打印.png

Observable#subscribeOn
指定了被观察者Observable的工作线程,需要的参数类型为Scheduler

public final Observable<T> subscribeOn(Scheduler scheduler) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        //传递Observable自身和scheduler作为参数,创建ObservableSubscribeOn对象并返回
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }
  1. 在上一篇源码分析一中可以知道,Observable#subscribeOn最后会返回ObservableSubscribeOn的实例。
  2. 在用例当中,调用了Schedulers.io()作为Scheduler参数,传递给Observable#subscribeOn方法。

Schedulers.io()

    //Schedulers.java
    @NonNull
    public static Scheduler io() {
        return RxJavaPlugins.onIoScheduler(IO);
    }

Schedulers.io()拿到的是IoScheduler的对象,RxJava的套路就是调用A类的B方法,最后返回一个BA对象。需要了解IoScheduler是怎么得到的,可以去查询一下源码,这里比较简单,不展开分析了。

IoScheduler

public final class IoScheduler extends Scheduler {

    //RxThreadFactory继承了ThreadFactory接口
    static final RxThreadFactory WORKER_THREAD_FACTORY;

    static {
        ...
        
        WORKER_THREAD_FACTORY = new RxThreadFactory(WORKER_THREAD_NAME_PREFIX, priority);
        ...
        NONE = new CachedWorkerPool(0, null, WORKER_THREAD_FACTORY);
        NONE.shutdown();
    }

    static final class CachedWorkerPool implements Runnable {
        private final long keepAliveTime;
        private final ConcurrentLinkedQueue<ThreadWorker> expiringWorkerQueue;
        final CompositeDisposable allWorkers;
        private final ScheduledExecutorService evictorService;//线程池
        private final Future<?> evictorTask;
        private final ThreadFactory threadFactory;

        CachedWorkerPool(long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {
            this.keepAliveTime = unit != null ? unit.toNanos(keepAliveTime) : 0L;
            this.expiringWorkerQueue = new ConcurrentLinkedQueue<ThreadWorker>();
            this.allWorkers = new CompositeDisposable();
            this.threadFactory = threadFactory;

            ScheduledExecutorService evictor = null;
            Future<?> task = null;
            if (unit != null) {
                evictor = Executors.newScheduledThreadPool(1, EVICTOR_THREAD_FACTORY);
                task = evictor.scheduleWithFixedDelay(this, this.keepAliveTime, this.keepAliveTime, TimeUnit.NANOSECONDS);
            }
            evictorService = evictor;
            evictorTask = task;
        }
    }

    public IoScheduler() {
        this(WORKER_THREAD_FACTORY);
    }

    public IoScheduler(ThreadFactory threadFactory) {
        this.threadFactory = threadFactory;
        this.pool = new AtomicReference<CachedWorkerPool>(NONE);
        start();
    }

}

1.IoScheduler被创建时调用了无参构造方法,内部再调用了带有ThreadFactory的重载构造方法。
2.WORKER_THREAD_FACTORY变量在静态代码块中被初始化,类型为RxThreadFactory。

再回到Observable#subscribeOn(Scheduler scheduler)

    public final Observable<T> subscribeOn(Scheduler scheduler) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        //创建ObservableSubscribeOn对象
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }

/**
  * ObservableSubscribeOn对象内部保存了传递进来的Scheduler对象。
  * ObservableSubscribeOn是Observable的子类。
  **/
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;

    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source);
        this.scheduler = scheduler;
    }
}

所以Observable调用subscribeOn(Scheduler scheduler)方法后,会创建ObservableSubscribeOn的对象,并且把上一步传递进来的ObservableCreate对象和IOScheduler 对象保存起来。

Observable#observeOn
这个方法指定了Observer的工作线程,需要的参数类型为Scheduler

    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Observable<T> observeOn(Scheduler scheduler) {
        return observeOn(scheduler, false, bufferSize());
    }

在用例当中,调用了AndroidSchedulers.mainThread()作为Scheduler参数,传递给Observable#subscribeOn方法。

AndroidSchedulers.mainThread()

    public static Scheduler mainThread() {
        return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
    }

AndroidSchedulers.mainThread()方法最后得到的是HandlerScheduler对象(按照RxJava套路,这里应该返回一个叫做MainThreadScheduler对象,但是这里居然不按套路来)。

再回到Observable#observeOn(Scheduler scheduler)

    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Observable<T> observeOn(Scheduler scheduler) {
        return observeOn(scheduler, false, bufferSize());
    }

    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        ...
        //创建一个ObservableObserveOn对象并把scheduler保存起来
        return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
    }

}

observeOn方法和subscribeOn方法基本也是一样的逻辑,创建ObservableObserveOn对象,并且把上一步传递过来的ObservableSubscribeOn对象和HandlerScheduler对象保存起来。

Observable#subscribe()
Observable对象实例为ObservableObserveOn,而根据RxJava的Observable#subscribe(observer)方法的套路,所以会调用ObservableObserveOn的subscribeActual方法。

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        //scheduler为HandlerScheduler的类型
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            Scheduler.Worker w = scheduler.createWorker();
            //用ObserveOnObserver订阅上游数据源。这样当数据从上游发送下来,会由ObserveOnObserver对应的onXXX()处理
            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }

source对象是在ObservableObserveOn创建时传递进来的,实例为ObservableSubscribeOn对象。所以会调用ObservableSubscribeOn的subscribeActual()方法。

ObservableSubscribeOn#subscribeActual()

    @Override
    public void subscribeActual(final Observer<? super T> s) {
        //创建SubscribeOnObserver实例
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);

        //s为ObserveOnObserver对象,回调ObserveOnObserver的onSubscribe方法
        //最后回调observer的onSubscribe
        s.onSubscribe(parent);
        
        //SubscribeTask继承了Runnable
        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }

  //SubscribeTask中的source为ObservableCreate对象
  //run方法的执行会回调ObservableOnSubscribe的subscribe方法
  final class SubscribeTask implements Runnable {
        private final SubscribeOnObserver<T> parent;

        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }

        @Override
        public void run() {
            source.subscribe(parent);
        }
    }
  1. 把ObserveOnObserver对象传递给SubscribeOnObserver的构造方法,创建SubscribeOnObserver对象。
  2. 回调ObserveOnObserver的onSubscribe()方法,最终回调到observer的onSubscribe()方法。
  3. 创建SubscribeTask对象,调用scheduler对象的scheduleDirect()方法,然后把返回值传递给SubscribeOnObserver对象。

Scheduler#scheduleDirect(Runnable)

    @NonNull
    public Disposable scheduleDirect(@NonNull Runnable run) {
        return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
    }

    @NonNull
    public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
        //createWorker方法在Scheduler类中为抽象方法,Scheduler的实例为IoScheduler
        //调用IoScheduler的createWorker()方法, 返回的实例为EventLoopWorker
        final Worker w = createWorker();
        
        //decoratedRun就是run对象
        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
        
        //把Worker和Runnable传递给DisposeTask,创建DisposeTask的实例
        DisposeTask task = new DisposeTask(decoratedRun, w);

        //调用EventLoopWorker的schedule()方法
        w.schedule(task, delay, unit);

        return task;
    }

    //IoScheduler.java
    @NonNull
    @Override
    public Worker createWorker() {
        return new EventLoopWorker(pool.get());
    }

    //IoScheduler$EventLoopWorker.java
    @NonNull
    @Override
    public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
        if (tasks.isDisposed()) {
            // don't schedule, we are unsubscribed
            return EmptyDisposable.INSTANCE;
        }

        return threadWorker.scheduleActual(action, delayTime, unit, tasks);
    }
  1. scheduleDirect方法首先调用createWorker方法,createWorker方法在Scheduler类中为抽象方法,而在IOScheduler中createWorker()方法返回EventLoopWorker对象。
  2. 把Worker和Runnable传递给DisposeTask,创建DisposeTask的实例,DisposeTask继承了Runnable和Dispose接口。
  3. 调用EventLoopWorker的schedule()方法,然后会调用到IoScheduler$ThreadWorker的schedule()方法,ThreadWorker继承自NewThreadWorker类,进入NewThreadWorker的scheduleActual()方法。

NewThreadWorker#scheduleActual()

    private final ScheduledExecutorService executor;

    public NewThreadWorker(ThreadFactory threadFactory) {
            executor = SchedulerPoolFactory.create(threadFactory);
    }

    @NonNull
    public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
        Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
        //把Runnable封装到ScheduledRunnable对象中
        //ScheduledRunnable继承了Runnable和Callable和Disposable接口
        ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
  
        ...
        
        //把sr提交给线程池executor执行
        //sr封装了ObservableSubscribeOn的内部类SubscribeTask对象
        //最后会执行SubscribeTask对象的run方法
        Future<?> f;
        try {
            if (delayTime <= 0) {
                f = executor.submit((Callable<Object>)sr);
            } else {
                f = executor.schedule((Callable<Object>)sr, delayTime, unit);
            }
            sr.setFuture(f);
        } catch (RejectedExecutionException ex) {
            ...
        }

        return sr;
    }

    //SchedulerPoolFactory#create()
    public static ScheduledExecutorService create(ThreadFactory factory) {
        final ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, factory);
        if (PURGE_ENABLED && exec instanceof ScheduledThreadPoolExecutor) {
            ScheduledThreadPoolExecutor e = (ScheduledThreadPoolExecutor) exec;
            POOLS.put(e, exec);
        }
        return exec;
    }
  1. NewThreadWorker的scheduleActual方法中的executor对象是通过Executors.newScheduledThreadPool()方法创建出来的定长线程池。
  2. 通过线程池executor执行Runnable包装后的Callable对象。
  3. Runnable对象是ObservableSubscribeOn的subscribeActual方法中创建的SubscribeTask对象。
  4. 所以Observable被观察者的工作线程是Executors来启动的,回调到SubscribeTask的run方法中。

SubscribeTask#run()

final class SubscribeTask implements Runnable {
        private final SubscribeOnObserver<T> parent;

        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }

        @Override
        public void run() {
            //source为ObservableCreate
            //parent为SubscribeOnObserver
            source.subscribe(parent);
        }
    }

    
    Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                Log.i("RxJava: subscribe运行在", "" + Thread.currentThread());
                e.onNext(0);
            }
        })
  1. run方法这里的工作线程为子线程。
  2. 在ObservableSubscribeOn对象中,source为ObservableCreate对象,source.subscribe(parent)的执行会回调最终会使用例子中的ObservableOnSubscribe#subscribe()。
  3. ObservableOnSubscribe#subscribe()的执行会调用e.onNext()方法, e的实例对象为ObservableSubscribeOn的内部类SubscribeOnObserver对象,SubscribeOnObserver封装了ObservableObserveOn对象。
  4. 最后会调用ObservableObserveOn子类的ObserveOnObserver的onNext()

ObservableObserveOn$ObserveOnObserver#onNext()

        @Override
        public void onNext(T t) {
            ...
            if (sourceMode != QueueDisposable.ASYNC) {
                //把被观察者发送过来的数据设置到queue队列中
                queue.offer(t);
            }
            schedule();
        }

        void schedule() {
            if (getAndIncrement() == 0) {
                worker.schedule(this);
            }
        }

        //Scheduler.java
        @NonNull
        public Disposable schedule(@NonNull Runnable run) {
            return schedule(run, 0L, TimeUnit.NANOSECONDS);
        }
  1. ObserveOnObserver的onNext方法中,先把被观察者发送过来的数据设置到queue队列中。
  2. 执行schedule(),调用HandlerScheduler子类HandlerWorker的schedule()方法。

HandlerScheduler$HandlerWorker#schedule()

        @Override
        public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
            ...
            
            //run对象为ObservableObserveOn.ObserveOnObserver
            run = RxJavaPlugins.onSchedule(run);

            ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);

            Message message = Message.obtain(handler, scheduled);
            message.obj = this; // Used as token for batch disposal of this worker's runnables.

            handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));

            // Re-check disposed state for removing in case we were racing a call to dispose().
            if (disposed) {
                handler.removeCallbacks(scheduled);
                return Disposables.disposed();
            }

            return scheduled;
        }
  1. 变量run为ObservableObserveOn的子类ObserveOnObserver对象。
  2. handler为主线程的handler,所以run对象的run方法会执行在主线程中。
  3. HandlerWorker的schedule就是通过主线程的handler发送message,实现线程的切换。

ObservableObserveOn$ObserveOnObserver#run()

        @Override
        public void run() {
                ...
                drainNormal();
                ...
        }

        void drainNormal() {
            ...

            final SimpleQueue<T> q = queue;
            final Observer<? super T> a = actual;

            for (;;) {
                if (checkTerminated(done, q.isEmpty(), a)) {
                    return;
                }

                for (;;) {
                    ...
                    try {
                        //不断从队列q中取出数据
                        v = q.poll();
                    } catch (Throwable ex) {
                        ...
                    }
        
                    ...
                    回调使用例子中的observer的onNext方法。
                    a.onNext(v);
                }
                ...
            }
        }

1.ObserveOnObserver的run方法运行在主线程中,先调用drainNormal方法。

  1. drainNormal方法中,先从队列q中取出上游事件发送下来的数据,然后调用变量a的onNext方法,变量a是使用例子中的observer变量。

总结:
1、Schedulers.io()会得到IOScheduler对象。
2、subscribeOn(Schedulers.io())方法会通过IOScheduler创建EventLoopWorker对象,在通过EventLoopWorker对象来获取ThreadWorker对象,然后调用ThreadWorker对象的schedule()方法。
3、ThreadWorker的构造方法中会同时创建executor变量,executor为定长线程池,schedule()中会通过executor线程池来执行observable的subscribe()方法,工作线程为子线程。
4、AndroidSchedulers.mainThread()会得到HandlerScheduler对象,同时获取主线程的handler对象。
5.、事件流下游的ObservableObserveOn子类ObserveOnObserver获取到上游observable发送的数据后,会先放入到数据队列中。
6、observeOn(AndroidSchedulers.mainThread())方法会通过HandlerScheduler来创建HandlerWorker对象,然后调用HandlerWorker的schedule(Runnable)方法。
7、HandlerWorker的构造方法中会获取到HandlerScheduler传递的handler对象,然后HandlerWorker的schedule(Runnable)方法,会通过主线程的handler来包装ObserveOnObserver成为message并发送给主线程的消息队列,通过此操作来把观察者的工作线程切换到主线程。
8、ObservableObserveOn子类ObserveOnObserver的run方法就会运行在主线程,然后取出数据队列q中的数据,调用observer的onNext()方法完成整个事件流的发送和线程的切换。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,444评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,421评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,036评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,363评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,460评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,502评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,511评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,280评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,736评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,014评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,190评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,848评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,531评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,159评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,411评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,067评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,078评论 2 352

推荐阅读更多精彩内容