RxJava2 线程切换分析

看了很多学习Rxjava2的Demo,觉得最好的是这个。
https://github.com/amitshekhariitbhu/RxJava2-Android-Samples
这里分析下RxJava2 线程是怎么切换的。分析RxJava2最麻烦的地方在于它的封装类太多,我们只抓主线就行了。首先看RxJava2 线程切换的例子:

getObservable()
.subscribeOn(Schedulers.io())//切换到io线程
.observeOn(AndroidSchedulers.mainThread(), true)//切换到主线程
.subscribe(getObserver());

一、subscribeOn分析
我们从subscribeOn开始分析,传入的参数是Schedulers.io(),源码是:

@NonNull
public static Scheduler io() {
   return RxJavaPlugins.onIoScheduler(IO);
}

RxJava2源码中有大量和RxJavaPlugins相关的代码,我们先不管RxJavaPlugins是干什么的,可以简单认为RxJavaPlugins只是wrap了一层,比如简单认为RxJavaPlugins.onIoScheduler(IO)返回就是IO,这样对主流程没影响。
IO是什么,追踪源码看到:

IO = RxJavaPlugins.initIoScheduler(new Callable<Scheduler>() {
            @Override
            public Scheduler call() throws Exception {
                return IoHolder.DEFAULT;
            }
        });

static final class IoHolder {
     static final Scheduler DEFAULT = new IoScheduler();
}

public final class IoScheduler extends Scheduler{
        ........
}

原来IO就是 IoScheduler,先看到这里,再回到subscribeOn方法本身。

public final Observable<T> subscribeOn(Scheduler scheduler) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>
                 (this, scheduler));//返回一个ObservableSubscribeOn对象
}

进入ObservableSubscribeOn类,

public final class ObservableSubscribeOn<T> extends 
       AbstractObservableWithUpstream<T, T> {
        final Scheduler scheduler;

    public ObservableSubscribeOn(ObservableSource<T> source, 
       Scheduler scheduler) {
        super(source);//source就是最初的Observable
        this.scheduler = scheduler;//scheduler就是最初传进来的IoScheduler
    }

    @Override
    public void subscribeActual(final Observer<? super T> s) {
        final SubscribeOnObserver<T> parent = new 
        SubscribeOnObserver<T>(s);
        s.onSubscribe(parent);

        parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
            @Override
            public void run() {
                source.subscribe(parent);
            }
        }));
    }
....................
}

可见ObservableSubscribeOn只是把最初的Observable和IO封装了下。
这里说下本文开头例子的执行流程,当getObservable()最后执行subscribe(getObserver());时,会走到上面的subscribeActual方法,为什么会走到这里可以查源码相关部分,这里不分析。来看下subscribeActual做了什么。方法里重点在这句

parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
       @Override
       public void run() {
             source.subscribe(parent);//重点在这里,封装成了Runnable,发送数据
       }
}));

上面的IoScheduler执行了scheduleDirect方法,并且把source.subscribe(parent);这个发送数据动作封装成了个Runnable。看到这里,猜到IoScheduler里有个线程来执行这个Runnable,这样就把线程给切换了。
来看看IoScheduler的scheduleDirect方法,IoScheduler类没有这个方法,scheduleDirect方法存在于基类Scheduler里,经过一个重载方法,最终走到这里:

public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
        final Worker w = createWorker();
        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
        w.schedule(new Runnable() {
            @Override
            public void run() {
                try {
                    decoratedRun.run();
                } finally {
                    w.dispose();
                }
            }
        }, delay, unit);
        return w;
    }

看到这里,猜测Worker w里有线程来跑这个wrap过的decoratedRun。
createWorker()是IoScheduler类自身的方法:

public Worker createWorker() {
    return new EventLoopWorker(pool.get());
}

EventLoopWorker类的主要方法如下:

 //构造函数
EventLoopWorker(CachedWorkerPool pool) {
            this.pool = pool;
            this.tasks = new CompositeDisposable();
            this.threadWorker = pool.get();
}
//上面scheduleDirect方法里Worker要调用这个方法
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
            if (tasks.isDisposed()) {
                // don't schedule, we are unsubscribed
                return EmptyDisposable.INSTANCE;
            }
            return threadWorker.scheduleActual(action, delayTime, unit, 
                   tasks);
}

可见把跑Runnable的任务交给了threadWorker,看下threadWorker到底是什么:

static final class ThreadWorker extends NewThreadWorker{
           .............
}
public class NewThreadWorker extends Scheduler.Worker implements Disposable {
         private final ScheduledExecutorService executor;
         public NewThreadWorker(ThreadFactory threadFactory) {
                executor = SchedulerPoolFactory.create(threadFactory);
         }

        //删除了一些判断代码,主干代码如下
        public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
              Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
              ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, 
        parent);

              if (delayTime <= 0) {
                     f = executor.submit((Callable<Object>)sr);
               } else {
                     f = executor.schedule((Callable<Object>)sr, delayTime, unit);
                }
               return sr;
           }
}

可见Runnable经过wrap后,最终交给了ScheduledExecutorService去执行。
到这里可以总结下,所谓线程切换只是将要跑的任务封装成Runnable,然后把Runnable交给ExecutorService去执行,源码看起来复杂的地方就在于一层层的往下传这个Runnable给真正执行的对象。

题外的分析
上面提到Runnable给ThreadWorker去跑,ThreadWorker从哪里来?看上面EventLoopWorker的构造函数源码,从里面看到threadWorker从CachedWorkerPool里得到:

this.threadWorker = pool.get();

CachedWorkerPool的主要代码如下:

    static final class CachedWorkerPool implements Runnable {
        private final long keepAliveTime;
        private final ConcurrentLinkedQueue<ThreadWorker> 
                                   expiringWorkerQueue;
        final CompositeDisposable allWorkers;
 
        ThreadWorker get() {
            if (allWorkers.isDisposed()) {
                return SHUTDOWN_THREAD_WORKER;
            }
            while (!expiringWorkerQueue.isEmpty()) {
                ThreadWorker threadWorker = expiringWorkerQueue.poll();//出队
                if (threadWorker != null) {
                    return threadWorker;
                }
            }
            // No cached worker found, so create a new one.
            ThreadWorker w = new ThreadWorker(threadFactory);//新构建
            allWorkers.add(w);
            return w;
        }
}

可以看到ThreadWorker先从expiringWorkerQueue中取,如果队列是空的,就创建个新的ThreadWorker。
那啥时候把ThreadWorker放入expiringWorkerQueue队列呢?看代码:

 void release(ThreadWorker threadWorker) {
     // Refresh expire time before putting worker back in pool
    threadWorker.setExpirationTime(now() + keepAliveTime);
     expiringWorkerQueue.offer(threadWorker);//加入队列
}

谁调用release呢?就是上面的EventLoopWorker的dispose方法:

    @Override
        public void dispose() {
            if (once.compareAndSet(false, true)) {
                tasks.dispose();

               // releasing the pool should be the last action
                // should prevent pool reuse in case there is a blocking
                // action not responding to cancellation
//                threadWorker.scheduleDirect(() -> {
//                    pool.release(threadWorker);
//                }, 0, TimeUnit.MILLISECONDS);

                pool.release(threadWorker);
            }
        }

二、observeOn分析
observeOn的流程和subscribeOn差不多,也是封装成了Runnable交给Scheduler,Scheduler再找对象去执行,例子里是Scheduler找Handler来实现的,下面来分析。来看AndroidSchedulers.mainThread(),MAIN_THREAD其实被封装过的DEFAULT这个Scheduler:

public static Scheduler mainThread() {
     return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
}
static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));

可以看到HandlerScheduler传入的参数是个MainLooper的Handler,这里就可以猜到RxJava2是用这个Handler将线程切换到主线程上。来看看HandlerScheduler源码:

final class HandlerScheduler extends Scheduler {
   public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
        if (run == null) throw new NullPointerException("run == null");
        if (unit == null) throw new NullPointerException("unit == null");

        run = RxJavaPlugins.onSchedule(run);
        ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
        handler.postDelayed(scheduled, Math.max(0L, unit.toMillis(delay)));
        return scheduled;
    }

    @Override
   public Worker createWorker() {
        return new HandlerWorker(handler);
   }
   private static final class HandlerWorker extends Worker {
       //删除一些判断代码,只看主干代码
       @Override
        public Disposable schedule(Runnable run, long delay, TimeUnit unit) 
         {
            ScheduledRunnable scheduled = new 
              ScheduledRunnable(handler, run);
            Message message = Message.obtain(handler, scheduled);
            message.obj = this; // Used as token for batch disposal of this worker's runnables.
           handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));
            return scheduled;
        }
   }
     ................
}

可以看到HandlerWorker结构非常类似在分析subscribeOn时最后碰到的NewThreadWorker,任务Runnable经过封装后最后都是交给这些Worker的schedule方法来执行。这里是通过Handler机制来运行Runnable的,上面代码很多,其实只用看这句:

Message message = Message.obtain(handler, scheduled);//获得Message,scheduled作为这个Message的callback变量

Message被handler发送出去,scheduled这个callback Runnable就在MainLooper所在线程里执行了。
题外的分析-Handler机制
下面简要分析message被handler发送后,scheduled这个callback Runnable怎么执行的。
Handler里有个Looper对象,Looper里有个MessageQueue(消息队列),Looper里有个loop()方法不断从MessageQueue取Message,源码精简后的流程如下:

public final class Looper {
      //通过ThreadLocal来存取Looper,每个线程只有一个Looper
      public static @Nullable Looper myLooper() {
             return sThreadLocal.get();
       }
      public static void loop() {
            final Looper me = myLooper();//获得Looper
            final MessageQueue queue = me.mQueue;//获得Looper里的
            MessageQueue
            for (;;) {
                   Message msg = queue.next();//获取消息队列的每个消息
                   msg.target.dispatchMessage(msg); //重点来了,target就是handler
            }
       }
       ....................
}
public class Handler {
      public void dispatchMessage(Message msg) {
               if (msg.callback != null) {//判断msg有callback先执行
                      handleCallback(msg);
                } else {
                      if (mCallback != null) {
                              if (mCallback.handleMessage(msg)) {
                               return;
                       }
            }
            handleMessage(msg);
      }
      private static void handleCallback(Message message) {
              message.callback.run();//Runnable执行了
       }
        ......................
    }
}

再次提醒,callback就是前面要执行的ScheduledRunnable,这样ScheduledRunnable就在Handler的Looper所在线程中执行了。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 219,635评论 6 508
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,628评论 3 396
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 165,971评论 0 356
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,986评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,006评论 6 394
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,784评论 1 307
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,475评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,364评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,860评论 1 317
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,008评论 3 338
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,152评论 1 351
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,829评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,490评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,035评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,156评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,428评论 3 373
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,127评论 2 356

推荐阅读更多精彩内容

  • Android 自定义View的各种姿势1 Activity的显示之ViewRootImpl详解 Activity...
    passiontim阅读 172,195评论 25 707
  • RxJava的被观察者在使用操作符时可以利用线程调度器--Scheduler来切换线程,例如 被观察者(Obser...
    fengzhizi715阅读 4,942评论 0 8
  • 我从去年开始使用 RxJava ,到现在一年多了。今年加入了 Flipboard 后,看到 Flipboard 的...
    Jason_andy阅读 5,478评论 7 62
  • 我们知道,多线程是Android开发中必现的场景,很多原生API和开源项目都有多线程的内容,这里简单总结和探讨一下...
    蓝灰_q阅读 11,872评论 0 12
  • 天上的星星亮闪闪,我不是其中一颗。 天上的月亮圆又圆,它只是一颗小星星。 星星看我,正如我看它么?我看着它那小小的...
    后夏夕颜心静如水阅读 1,300评论 2 6