RxJava源码分析(四)线程切换observeOn

引言

前面的文章我们走完了订阅方法线程切换的实现,今天我们来看观察方法的线程切换。

线程调度observeOn

 .subscribeOn(Schedulers.io())
 .observeOn(AndroidSchedulers.mainThread())
 .subscribe(new Observer<String>() {...}

接着看observeOn方法:

public final Observable<T> observeOn(Scheduler scheduler) {
        return observeOn(scheduler, false, bufferSize());
    }

    public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        ....
         //返回ObservableObserveOn对象
        return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
    }

observeOn操作符返回了ObservableObserveOn对象,这个类比较长,我们仍然只梳理核心部分:

public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
    //调用层传入的线程调度器
    final Scheduler scheduler;
    //默认false
    final boolean delayError;
    //默认128
    final int bufferSize;
    public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
        super(source);
        this.scheduler = scheduler;
        this.delayError = delayError;
        this.bufferSize = bufferSize;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        // false
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            //1 创建出一个对应的Worker
            //连续切换观察者线程时候,scheduler发生变化,observer的方法就会交给新设定的Scheduler.Worker执行.
            Scheduler.Worker w = scheduler.createWorker();
            //2 订阅上游数据源, 封装了传入的observer,构造ObserveOnObserver对象并订阅它
            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }

这段代码做了两件事情:

1.创建一个Scheduler对应的Worker;
2.根据传入的Observer对象构造代理类对象ObserveOnObserver,在这个类中通过Worker实现观察方法的线程切换;

接下来我们看核心类ObserveOnObserver。

观察方法线程切换核心实现ObserveOnObserver

static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
    implements Observer<T>, Runnable {
        //调用层传入的观察者,也是观察方法的最终调用者
        final Observer<? super T> actual;
        //对应Scheduler里的Worker
        final Scheduler.Worker worker;
        //上游被观察者发送过来的数据都存在这里
        SimpleQueue<T> queue;
        Disposable s;
        Throwable error;
        //是否完成
        volatile boolean done;
        //是否取消
        volatile boolean cancelled;
        // 代表同步发送 异步发送 
        int sourceMode;
        ....
        @Override
        public void onSubscribe(Disposable s) {
            if (DisposableHelper.validate(this.s, s)) {
                this.s = s;
                ...
                //创建一个queue 用于保存上游 onNext()发送的数据,SpscLinkedArrayQueue根据注释解释,是一个实现了单生产-单消费模型的线程安全队列。
                queue = new SpscLinkedArrayQueue<T>(bufferSize);
                //回调下游观察者onSubscribe方法
                actual.onSubscribe(this);
            }
        }

        @Override
        public void onNext(T t) {
            //1 执行过error / complete 会是true
            if (done) {
                return;
            }
            //2 如果数据源类型不是异步的, 默认不是
            if (sourceMode != QueueDisposable.ASYNC) {
                //3 将上游push过来的数据 加入 queue里
                queue.offer(t);
            }
            //4 开始进入对应Workder线程,在线程里 将queue里的t 取出 发送给下游Observer
            schedule();
        }

        @Override
        public void onError(Throwable t) {
            if (done) {
                RxJavaPlugins.onError(t);
                return;
            }
            //给error存个值 
            error = t;
            done = true;
            //开始调度
            schedule();
        }

        @Override
        public void onComplete() {
            if (done) {
                return;
            }
            done = true;
            //开始调度
            schedule();
        }

        void schedule() {
            if (getAndIncrement() == 0) {
               //交给worker调度run()方法
                worker.schedule(this);
            }
        }
        //从这里开始,这个方法已经是在Workder对应的线程里执行的了
        @Override
        public void run() {
            //默认是false
            if (outputFused) {
                drainFused();
            } else {
                //取出queue里的数据 发送
                drainNormal();
            }
        }

        //循环从队列取数据,交给观察者
        void drainNormal() {
            int missed = 1;

            final SimpleQueue<T> q = queue;
            final Observer<? super T> a = actual;

            for (;;) {
                // 1 如果已经 终止 或者queue空,则跳出函数,
                if (checkTerminated(done, q.isEmpty(), a)) {
                    return;
                }

                for (;;) {
                    boolean d = done;
                    T v;

                    try {
                        //2 从queue里取出一个值
                        v = q.poll();
                    } catch (Throwable ex) {
                        //3 异常处理 并跳出函数
                        Exceptions.throwIfFatal(ex);
                        s.dispose();
                        q.clear();
                        a.onError(ex);
                        return;
                    }
                    boolean empty = v == null;
                    //4 再次检查 是否 终止  如果满足条件 跳出函数
                    if (checkTerminated(d, empty, a)) {
                        return;
                    }
                    //5 上游还没结束数据发送,但是这边处理的队列已经是空的,不会push给下游 Observer
                    if (empty) {
                        //仅仅是结束这次循环,不发送这个数据而已,并不会跳出函数
                        break;
                    }
                    //6 发送给观察者
                    a.onNext(v);
                }
                 ...
            }
        }

        //检查 是否 已经 结束(error complete), 是否没数据要发送了(empty 空), 
        boolean checkTerminated(boolean d, boolean empty, Observer<? super T> a) {
            //如果已经disposed 
            if (cancelled) {
                queue.clear();
                return true;
            }
            // 如果已经结束
            if (d) {
                Throwable e = error;
                //如果是延迟发送错误
                if (delayError) {
                    //如果空
                    if (empty) {
                        if (e != null) {
                            a.onError(e);
                        } else {
                            a.onComplete();
                        }
                        //停止worker(线程)
                        worker.dispose();
                        return true;
                    }
                } else {
                    //发送错误
                    if (e != null) {
                        queue.clear();
                        a.onError(e);
                        worker.dispose();
                        return true;
                    } else
                    //发送complete
                    if (empty) {
                        a.onComplete();
                        worker.dispose();
                        return true;
                    }
                }
            }
            return false;
        }
    }

主要部分都加了注释,这里总结一下:
1.ObserveOnObserver实现了Observer和Runnable接口,封装调用层传入的观察者的对象;
2.在onNext()里,先不切换线程,将数据加入队列queue。然后开始切换线程,在另一线程中,从queue里取出数据,push给下游Observer;
3.observeOn()影响的是其下游的代码,subscribeActual方法就会给Observer方法分配新的调度器,所以多次调用仍然生效。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,692评论 6 501
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,482评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,995评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,223评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,245评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,208评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,091评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,929评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,346评论 1 311
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,570评论 2 333
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,739评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,437评论 5 344
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,037评论 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,677评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,833评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,760评论 2 369
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,647评论 2 354

推荐阅读更多精彩内容