RxJava2.0线程切换原理

image.png
前言
理解线程切换原理有什么意义?
1、可以清楚的知道这个线程切换操作影响到哪些代码的执行线程,不会影响到哪些代码的执行线程
2、灵活运用线程切换来实现复杂的应用场景
3、有利于在发生线程相关的问题时进行调试
不指定线程
在Observable中根据id去获取Drawable 对象,将获取到Drawable 对象发送给下游的Observer。
Observable.create(ObservableOnSubscribe<Drawable> { e->
            run {
                val drawable = theme.getDrawable(drawableRes)
                e.onNext(drawable)
            }
        }).subscribe(Consumer { s->  run {
            imageView!!.setImageDrawable(s)
        }})
RxJava遵循的是线程不变换的原则,在那个线程调用了subscribe(),就在哪个线程生产事件e.next()
上面代码片段中没有涉及到线程切换,下面我们首先谈谈
subscribeOn() 和 observeOn()
Observable.create(ObservableOnSubscribe<Drawable> { e->
            run {
                val drawable = theme.getDrawable(drawableRes)
                e.onNext(drawable)
            }
        }).subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(Consumer { s->  run {
            imageView!!.setImageDrawable(s)
        }})
subscribeOn()------->指定上面的代码执行线程
即:代码片段:
Observable.create(ObservableOnSubscribe<Drawable> { e->
            run {
                val drawable = theme.getDrawable(drawableRes)
                e.onNext(drawable)
            }
        });
observeOn()------->指定下面的代码执行线程
即:代码片段:
Observable.subscribe(Consumer { s->  run {
            imageView!!.setImageDrawable(s)
        }})
How?先一个一个来说:
1、subscribeOn(Schedulers.io())
在Schedulers有很多策略模式:比如 Schedulers.io()Schedulers.newThread(),Schedulers.computation()等等还有其他。
先看一下这块的关系:
  @NonNull
    public static Scheduler io() {
        return RxJavaPlugins.onIoScheduler(IO);
    }
追到:Schedulers类
IO = RxJavaPlugins.initIoScheduler(new IOTask());
走进IOTask:
 static final class IOTask implements Callable<Scheduler> {
        @Override
        public Scheduler call() throws Exception {
            return IoHolder.DEFAULT;
        }
    }
IOTask是一个实现Callable接口的静态类不类,这个看着比较眼熟:
  static class WorkThread implements Callable<String>{
        @Override
        public String call() throws Exception {
            System.out.println("WorkThread");
            return "sucess";
        }
    }
一个有返回值的任务,启动方式给之前的线程有点不一样,与AsnyTask的用法完全一样,AnsyTask目前我们先不分析,后面会和大家一一讲到。
  WorkThread workThread =new WorkThread();
  FutureTask task =new FutureTask(workThread);
  new Thread(task).start();
image.png
我们接着看:
  static final class IoHolder {
        static final Scheduler DEFAULT = new IoScheduler();
  }
从上述可以总结出,Scheduler 中策略模式对应的关系,如下所示:
image.png
Schedulers.newThread()代码和Schedulers.IO()区别不是不大,就不贴出来了,有意向的老铁可以去看这块的代码。
接着看Observable.subscribeOn(Schedulers.io())
  @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Observable<T> subscribeOn(Scheduler scheduler) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }
  }
传入ObservableSubscribeOn(自定义资源)对象前面文章有讲过
ObservableSubscribeOn类
 @Override
    public void subscribeActual(final Observer<? super T> s) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);

        s.onSubscribe(parent);

        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }
Scheduler类
 public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
        final Worker w = createWorker();

        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

        DisposeTask task = new DisposeTask(decoratedRun, w);

        w.schedule(task, delay, unit);

        return task;
    }
上面代码片段中创建了Worker对象,看看到底干了什么,通过Work对象我们来到了NewThreadWorker类中
public class NewThreadWorker extends Scheduler.Worker implements Disposable {
    private final ScheduledExecutorService executor;

    volatile boolean disposed;

    public NewThreadWorker(ThreadFactory threadFactory) {
        executor = SchedulerPoolFactory.create(threadFactory);
    }
  }
看到ScheduledExecutorService对象有什么感想,线程池 对的没错,从Scheduler对象中可以看出,NewThreadWorker ,DisposeTask拿到了线程池对象,创建DisposeTask对象,DisposeTask是一个线程,给线程添加了Disposable中断功能,通传进来的线程进行层层封装之后调用, w.schedule(task, delay, unit);方法提交到线程池让线程执行,所有在上面调用subscribeOn(Schedulers.io())将自定义资源执行的线程为子线程,中去对应的资源文件。最后补充完整这张图:
image.png
2、.observeOn(AndroidSchedulers.mainThread())
在observeOn()中传入AndroidSchedulers.mainThread(),给前面也是一样的AndroidSchedulers类
 private static final class MainHolder {
      static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
    } 
再看看ObservableObserveOn对象需要特别注意的是
 @Override
    protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            Scheduler.Worker w = scheduler.createWorker();
            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }
将传递Observer进行包装得到ObserveOnObserver对象,
 ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
            this.actual = actual;
            this.worker = worker;
            this.delayError = delayError;
            this.bufferSize = bufferSize;
        }
在ObserveOnObserver对象中有worker 对象,同样的Work是一个抽象类,派生出了HandlerWorker对象:HandlerWorker
     @Override
        public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
            if (run == null) throw new NullPointerException("run == null");
            if (unit == null) throw new NullPointerException("unit == null");

            if (disposed) {
                return Disposables.disposed();
            }

            run = RxJavaPlugins.onSchedule(run);

            ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);

            Message message = Message.obtain(handler, scheduled);
            message.obj = this;

            handler.sendMessageDelayed(message, unit.toMillis(delay));

            // Re-check disposed state for removing in case we were racing a call to dispose().
            if (disposed) {
                handler.removeCallbacks(scheduled);
                return Disposables.disposed();
            }

            return scheduled;
        }
HandlerWorker代码片段中出现了handler对象,调用 handler.sendMessageDelayed(message, unit.toMillis(delay));方法将线程池中执行完之后的数据进行发送,但是有个问题,只是看到数据在发送但是没有看到那个地方在接收数据,于是回想自定义资源中持有包装后的Observer对象,那我们这包装后的Observer对象就是ObserveOnObserver对象,我在ObserverOnObserver对象中找到接受的数据的方法:
void drainNormal() {
            int missed = 1;

            final SimpleQueue<T> q = queue;
            final Observer<? super T> a = actual;

            for (;;) {
                if (checkTerminated(done, q.isEmpty(), a)) {
                    return;
                }

                for (;;) {
                    boolean d = done;
                    T v;

                    try {
                        v = q.poll();
                    } catch (Throwable ex) {
                        Exceptions.throwIfFatal(ex);
                        s.dispose();
                        q.clear();
                        a.onError(ex);
                        worker.dispose();
                        return;
                    }
                    boolean empty = v == null;

                    if (checkTerminated(d, empty, a)) {
                        return;
                    }

                    if (empty) {
                        break;
                    }

                    a.onNext(v);
                }

                missed = addAndGet(-missed);
                if (missed == 0) {
                    break;
                }
            }
        }
最后调用drainNormal()中 a.onNext(v);将拿到数据传递到下游,整个两次线程切换的大致流程图如图所示:
image.png
总结:
总的来说,RxJava的处理顺序像一条流水线,这不仅仅是代码写起来像一条链上,当你切换流水的流向(线程),整条链都改变了方向,并不会进行分流。
1、observeOn() 只是在收到 onNext() 等消息的时候改变了从下一个开始的操作符的线程运行环境。
2、subscribeOn() 线程切换是在 subscribe() 订阅的时候切换,他会切换他下面订阅的操作符的运行环境,因为订阅的过程是自下而上的,所以第一个出现的 subscribeOn() 操作符反而是最后一次运行的。
不得不说 Handler 在安卓中的地位真的是很牛逼。
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,324评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,303评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,192评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,555评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,569评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,566评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,927评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,583评论 0 257
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,827评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,590评论 2 320
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,669评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,365评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,941评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,928评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,159评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,880评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,399评论 2 342