RxJava2源码(三)

subscribeOn

  1. 找到ObservableSubscribeOn类

    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source);
        // 传入上游的Observable和调度器Scheduler
        this.scheduler = scheduler;
    }
    
    @Override
    public void subscribeActual(final Observer<? super T> s) {
        // 用下游的observer创建一个新的observer
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
    
        s.onSubscribe(parent);
        // scheduler直接执行SubscribeTask
        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }
    
  2. 查看SubscribeOnObserver类,除了实现Disposable接口还实现了Observer接口,说明他还是个新的observer;

    static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
    
        private static final long serialVersionUID = 8094547886072529208L;
        final Observer<? super T> actual;
    
        final AtomicReference<Disposable> s;
    
        SubscribeOnObserver(Observer<? super T> actual) {
            this.actual = actual;// 下游的observer
            this.s = new AtomicReference<Disposable>();
        }
    
        @Override
        public void onSubscribe(Disposable s) {
            // 上游的onSubscribe会调用,但是因为this.s的disposable不为null,大部分情况一直都是直接跳过
            DisposableHelper.setOnce(this.s, s);
        }
    
        @Override
        public void onNext(T t) {
            actual.onNext(t);
        }
    
        @Override
        public void onError(Throwable t) {
            actual.onError(t);
        }
    
        @Override
        public void onComplete() {
            actual.onComplete();
        }
    
        @Override
        public void dispose() {
            DisposableHelper.dispose(s); // 这里dispose多了个步骤,没明白????
            DisposableHelper.dispose(this);
        }
    
        @Override
        public boolean isDisposed() {
            return DisposableHelper.isDisposed(get());
        }
    
        void setDisposable(Disposable d) {
            DisposableHelper.setOnce(this, d);
        }
    }
    
  3. 查看SubscribeTask,这是个Runnable,并且是ObservableSubscribeOn的内部类

    final class SubscribeTask implements Runnable {
        private final SubscribeOnObserver<T> parent;
    
        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }
    
        @Override
        public void run() {
            // 上游的observable执行subscribe,传入的是下游的obser
            source.subscribe(parent);
        }
    }
    
  4. 查看scheduler.scheduleDirect,这个方法就是用scheduler立即执行传入的Runnable任务

    @NonNull
    public Disposable scheduleDirect(@NonNull Runnable run) {
        return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
    }
    
  5. 所以总结下

    1. ObservableSubscribeOn起到一个桥接的功能,执行source.subscribe(parent),处理上游的Observable
    2. 传入的scheduler用来控制怎么执行

observeOn

  1. 找到ObservableObserveOn类

    public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
        final Scheduler scheduler;
        final boolean delayError;
        final int bufferSize;
        public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
            super(source);
            this.scheduler = scheduler; // 调度器
            this.delayError = delayError;// 
            this.bufferSize = bufferSize;// 任务队列大小,默认128
        }
    
        @Override
        protected void subscribeActual(Observer<? super T> observer) {
            if (scheduler instanceof TrampolineScheduler) {
                // 如果是TrampolineScheduler,放个屁啥都不干?
                source.subscribe(observer);
            } else {
                // scheduler.createWorker
                Scheduler.Worker w = scheduler.createWorker();
                // 下游的observer包装成ObserveOnObserver,传给上游的source.subscribe
                source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
            }
        }
    
  2. 查看ObserveOnObserver类:AtomicInteger的子类,实现了QueueDisposable,Observer<T>, Runnable

    static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
    implements Observer<T>, Runnable {
    
        private static final long serialVersionUID = 6576896619930983584L;
        final Observer<? super T> actual;
        final Scheduler.Worker worker;
        final boolean delayError;
        final int bufferSize;
    
        SimpleQueue<T> queue; // 一个队列
    
        Disposable s;
    
        Throwable error;
        volatile boolean done;// 标记是否结束,和disposable不一样的是如果done为true,还是会走OnComplete或者onError
        volatile boolean cancelled;// 标记disposable状态
    
        int sourceMode;
    
        boolean outputFused;
    
        ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
            this.actual = actual;
            this.worker = worker;
            this.delayError = delayError;
            this.bufferSize = bufferSize;
        }
    
  3. 查看ObserveOnObserver类的onSubscribe方法

    @Override
    public void onSubscribe(Disposable s) {
        if (DisposableHelper.validate(this.s, s)) {
            this.s = s;
    // 这里省略了一部分FuseMode的代码,默认情况不会走
            // 初始化队列
            queue = new SpscLinkedArrayQueue<T>(bufferSize);
            // 下游observer的onSubscribe回调
            actual.onSubscribe(this);
        }
    }
    
  4. 查看disposable的逻辑,cancelled变量标记,dispose同时还会清空队列和dispose任务worker

    @Override
    public void dispose() {
        if (!cancelled) {
            cancelled = true;
            s.dispose();
            worker.dispose();// 中断Scheduler任务
            if (getAndIncrement() == 0) {
                // 清空队列
                queue.clear();
            }
        }
    }
    
    @Override
    public boolean isDisposed() {
        return cancelled;
    }
    
  5. 查看ObserveOnObserver的onNext,把数据塞到队列里,并且只有AtomicInteger值为0,才执行任务worker.schedule

    @Override
    public void onNext(T t) {
        if (done) {
            return;
        }
    
        if (sourceMode != QueueDisposable.ASYNC) {
            // 入队列
            queue.offer(t);
        }
        schedule();
    }
    
    void schedule() {
        if (getAndIncrement() == 0) {
            // AtomicInteger的值为0就执行worker.schedule,传入的this是Runnable;AtomicInteger加1
            worker.schedule(this);
        }
    }
    
  6. ObserveOnObserver的Runnable实现run方法

    @Override
    public void run() {
        if (outputFused) {
            // 默认为false,先不管
            drainFused();
        } else {
            drainNormal();
        }
    }
    
    void drainNormal() {
        int missed = 1;// 这里成功接收一次数据missed就加1,默认为1
    
        final SimpleQueue<T> q = queue;
        final Observer<? super T> a = actual;
    
        for (;;) {
            if (checkTerminated(done, q.isEmpty(), a)) {
                // disposable或者done的情况会返回true
                return;
            }
    
            for (;;) {
                boolean d = done;
                T v;
    
                try {
                    v = q.poll();// 取出队列一个数据v
                } catch (Throwable ex) {
                    Exceptions.throwIfFatal(ex);
                    s.dispose();
                    q.clear();
                    a.onError(ex);
                    worker.dispose();
                    return;
                }
                boolean empty = v == null;
    
                if (checkTerminated(d, empty, a)) {
                    // 再次检查是否已经done或者diposable
                    return;
                }
    
                if (empty) {
                    // 如果队列空了跳出里面的for循环
                    break;
                }
                // 下游的Observer回调onNext
                a.onNext(v);
            }
    
            // 更新missed值,表示还剩几个走了schedule但是还没有被调用onNext的任务
            missed = addAndGet(-missed);
            if (missed == 0) {
                // missed为0跳出for循环
                break;
            }
        }
    }
    

    这里解释下为什么要两个for循环和为什么用AtomicInteger标记schedule次数,因为要考虑上游事件发送和下游事件接受速度是不一样,而且worker.schedule导致上下游不在一个线程,比如下面几个例子

    1. 发送数据(1,2,3,4)很快,接收数据很慢:那么异步情况下,很快的会调用四次schedule,getAndIncrement只有第一次为0,只会走一次worker.schedule(this),那么run方法就只会走一次,会在外面的for循环跳出,也就是missed=0结束
    2. 发送数据(1,2,3,4,complete)很快,接收数据很慢:基本同上,但是因为发送了complete导致状态为disposable,会在里面的for循环return,因为checkTerminated(d, empty, a)返回了true
    3. 发送数据(1,2)很慢,发送数据(3,4)很快,接收数据(2)很慢:那么异步情况下,有可能在2执行完onNext,刚刚跳出里面的for循环,这时候发送数据3了导致队列不为空,miss不为空了所以不会跳出外面的for循环。这样就不用worker.schedule(this);
  7. 总结下

    1. 同样是桥接,这里对下游的observer做处理
    2. 传入的scheduler用来控制怎么执行

Example

下面代码是我们很常见的一个例子,发送数据(1,2)在IO线程执行,Observer在UI线程中执行

Observable.just(1,2)
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {
                Log.e("yj", "---onSubscribe==" + d.isDisposed());
            }

            @Override
            public void onNext(@NonNull Integer i) {
                Log.e("yj", "---onNext==" + i);
            }

            @Override
            public void onError(@NonNull Throwable e) {
                Log.e("yj", "---onError==" + e);
            }

            @Override
            public void onComplete() {
                Log.e("yj", "---onComplete");
            }
        });
  1. 依次创建了三个Observable:ObservableFromArray,ObservableSubscribeOn,ObservableObserveOn
  2. 从下往上看subscribeActual方法调用:ObservableObserveOn不做处理,ObservableSubscribeOn使subscribe在io线程中执行,ObservableFromArray顺序发送1,2
  3. Observer的回调只有ObservableObserveOn处理了,使其在UI线程中被调用

再举个的例子

下面的代码执行了两次subscribeOn和observeOn,那么just和accept在哪个线程执行呢?

Observable.just(1,2)
        .subscribeOn(Schedulers.io())
        .subscribeOn(AndroidSchedulers.mainThread())
        .observeOn(AndroidSchedulers.mainThread())
        .observeOn(Schedulers.io())
        .subscribe(new Consumer<Integer>() {
            @Override
            public void accept(@NonNull Integer integer) throws Exception {
                
            }
        });

答案:在两个不同的IO线程

PS

我的github:https://github.com/nppp1990/MyTips

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,142评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,298评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,068评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,081评论 1 291
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,099评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,071评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,990评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,832评论 0 273
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,274评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,488评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,649评论 1 347
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,378评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,979评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,625评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,796评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,643评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,545评论 2 352

推荐阅读更多精彩内容