Rxjava(二)——线程切换怎么实现的?

接上文Rxjava(一)——链式调用怎么实现的?
在分析线程切换原理前要明白几个概念;

线程调用的关键操作符subscribeOn、observeOn

observeOn作用:影响后续操作符所在的线程,直到下个observeOn设置为其他线程;
subscribeOn作用:初始化整个链条所在的线程,多次设置只有第一次生效;

线程调度器 Schedulers

Rxjava里面将常用线程归纳为4种,即有4 种调度器:

  • 主线程 AndroidSchedulers.mainThread();
  • io线程 Schedulers.io()
  • 计算线程 Schedulers.computation()
  • 新建线程 Schedulers.newThread()

同样以一个实例来进行分析:

    Observable.just("a")
               .observeOn(Schedulers.computation())
                .map(new Func1<String, String>() {  //操作1
                    @Override
                    public String call(String s) {
                        System.out.print(Thread.currentThread().getName() + ":first--" + s +"\n");
                        return s + s;
                    }
                })
                 .observeOn(Schedulers.io())
                .map(new Func1<String, String>() { //操作2
                    @Override
                    public String call(String s) {
                        System.out.print(Thread.currentThread().getName() + ":second--" + s+"\n");
                        return s + s;
                    }
                })
                .subscribeOn(Schedulers.newThread())
                .subscribe(new Subscriber<String>() {//操作3
                    @Override
                    public void onCompleted() {
                        System.out.print(Thread.currentThread().getName()+"\n");
                        System.out.print("completed"+"\n");

                    }

                    @Override
                    public void onError(Throwable e) {
                        System.out.print("error");
                    }

                    @Override
                    public void onNext(String s) {
                        System.out.println(s);
                    }
                });

上述实例中,分别使用了computation线程、io线程和新线程,执行代码如下:


image.png

由于subscribeOn是在整个调用链之前,其作用于整个链条,而observeOn只作用此它操作符之后,因此上图结束RxComputationThreadPool-1 在计算线程中,而之后的全部都处理io()线程RxCachedThreadScheduler-1中。

正式开始撸代码

带着几个问题来跟读代码:

  • 1 Observable.just("a")生成的Observable对象,如何调用到计算线程中,线程切换通过什么实现的?
  • 2 为什么subscribeOn()是对整个调用链条起作用?

问题1:Observable.just("a")生成的Observable对象,如何调用到计算线程中,线程切换通过什么实现的?

Observable.just("a").observeOn(Schedulers.computation())

由于just发送的单个对象,因此Observable使用的创建对象为ScalarSynchronousObservable;在其初始化对象时,将"a"作为构造参数传入,并保存。
observeOn操作符会首先对Observable的类型进行检测,若为ScalarSynchronousObservable类型,则通过ScalarSynchronousObservable@scalarScheduleOn来实现在某个线程中调度的过程;
跟进ScalarSynchronousObservable类,

    public Observable<T> scalarScheduleOn(Scheduler scheduler) {
        if (scheduler instanceof EventLoopsScheduler) {
            EventLoopsScheduler es = (EventLoopsScheduler) scheduler;
            return create(new DirectScheduledEmission<T>(es, t));
        }
        return create(new NormalScheduledEmission<T>(scheduler, t));
    }

很显然,使用DirectScheduledEmission,通过call,完成计算线程池的直接调度。

  static final class DirectScheduledEmission<T> implements OnSubscribe<T> {
        private final EventLoopsScheduler es;
        private final T value;
        DirectScheduledEmission(EventLoopsScheduler es, T value) {
            this.es = es;
            this.value = value;
        }
        @Override
        public void call(final Subscriber<? super T> child) {
            child.add(es.scheduleDirect(new ScalarSynchronousAction<T>(child, value)));
        }
    }
    //EventLoopsScheduler.class
    public Subscription scheduleDirect(Action0 action) {
       PoolWorker pw = pool.get().getEventLoop();
       return pw.scheduleActual(action, -1, TimeUnit.NANOSECONDS);
    }
    public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit) {
        Action0 decoratedAction = schedulersHook.onSchedule(action);
        ScheduledAction run = new ScheduledAction(decoratedAction);
        Future<?> f;
        if (delayTime <= 0) {
            f = executor.submit(run);//执行动作
        } else {
            f = executor.schedule(run, delayTime, unit);
        }
        run.add(f);

        return run;
    }

到此时,executor.submit(run);执行,切换到对应线程上完成Action的调用;而Action里面做的什么事, 找到实现方法,发现其call()方法就是将数据传递到下一层去而已:

    /** Action that emits a single value when called. */
    static final class ScalarSynchronousAction<T> implements Action0 {
        private final Subscriber<? super T> subscriber;
        private final T value;

        private ScalarSynchronousAction(Subscriber<? super T> subscriber,
                T value) {
            this.subscriber = subscriber;
            this.value = value;
        }

        @Override
        public void call() {
            try {
                subscriber.onNext(value);
            } catch (Throwable t) {
                subscriber.onError(t);
                return;
            }
            subscriber.onCompleted();
        }
    }

问题一到此,就分析得差不多了。

问题2: 为什么subscribeOn()是对整个调用链条起作用?

 return nest().lift(new OperatorSubscribeOn<T>(scheduler));

能对整个调用链起作用的关键点是nest();为什么这么说,看其实现;

  public final Observable<Observable<T>> nest() {
        return just(this);
    }

在此,可以看到其实还是使用的just,但是,关键是发送的观察者是this;发送this,就意味着subscribeOn所在的Observable对象发送了出去。this所代表的对象将会作为一个嵌套链表嵌入到subscribeOn所产生的新的Observable调用链中;
那么this如何嵌入到新的Observable中的呢?
查看OperatorSubscribeOn类源码,关键代码o.unsafeSubscribe

  @Override
    public Subscriber<? super Observable<T>> call(final Subscriber<? super T> subscriber) {
        final Worker inner = scheduler.createWorker();
        subscriber.add(inner);
        return new Subscriber<Observable<T>>(subscriber) {

            @Override
            public void onCompleted() {
                // ignore because this is a nested Observable and we expect only 1 Observable<T> emitted to onNext
            }

            @Override
            public void onError(Throwable e) {
                subscriber.onError(e);
            }

            @Override
            public void onNext(final Observable<T> o) {
                inner.schedule(new Action0() {

                    @Override
                    public void call() {
                        final Thread t = Thread.currentThread();
                        o.unsafeSubscribe(new Subscriber<T>(subscriber) {

                            @Override
                            public void onCompleted() {
                                subscriber.onCompleted();
                            }

                            @Override
                            public void onError(Throwable e) {
                                subscriber.onError(e);
                            }

                            @Override
                            public void onNext(T t) {
                                subscriber.onNext(t);
                            }

                            @Override
                            public void setProducer(final Producer producer) {
                                subscriber.setProducer(new Producer() {

                                    @Override
                                    public void request(final long n) {
                                        if (Thread.currentThread() == t) {
                                            // don't schedule if we're already on the thread (primarily for first setProducer call)
                                            // see unit test 'testSetProducerSynchronousRequest' for more context on this
                                            producer.request(n);
                                        } else {
                                            inner.schedule(new Action0() {

                                                @Override
                                                public void call() {
                                                    producer.request(n);
                                                }
                                            });
                                        }
                                    }

                                });
                            }

                        });
                    }
                });
            }

        };
    }
}

此处的o对象就是nest发送的this,通过unsafeSubscribe函数,重新形成调用链;
此处在执行的同时还存在线程切换,即inner.schedule。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 218,284评论 6 506
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,115评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,614评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,671评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,699评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,562评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,309评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,223评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,668评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,859评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,981评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,705评论 5 347
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,310评论 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,904评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,023评论 1 270
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,146评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,933评论 2 355

推荐阅读更多精彩内容