拆轮子系列--RxJava理解(三)--observeOn

本系列文章如下:

上一篇文章主要介绍了RxJava中线程调度的核心方法之一subscribeOn,本篇文章继续分析RxJava中线程调度的另一个核心方法--observeOn。本篇文章基于RxJava2源码进行分析。
本文的大纲如下:

  • 一个具体的例子
  • observeOn源码分析
  • 总结

1 .一个具体的例子

首先,以一个具体的例子分析observeOn的原理:

Observable.create(new ObservableOnSubscribe<String>() {
     @Override
     public void subscribe(ObservableEmitter<String> e) throws Exception {
                e.onNext("1");
                Thread.sleep(1000);
                e.onNext("2");
                Thread.sleep(1000);
                e.onComplete();
            }
        })
                .map(new Function<String, Integer>() {
                    @Override
                    public Integer apply(String s) throws Exception {
                        Log.e("TAG", "map1--thread=" + Thread.currentThread().getName() + "-s:" + s);
                        return Integer.valueOf(s);
                    }
                })
                .subscribeOn(AndroidSchedulers.mainThread())
                .map(new Function<Integer, Long>() {
                 @Override
                public Long apply(Integer integer) throws Exception {
                        Log.e("TAG", "map2--thread=" + Thread.currentThread().getName() + "-integer:" + integer);
                        return Long.valueOf(integer);
                    }
                })
                .observeOn(Schedulers.io())
                .map(new Function<Long, String>() {
                    @Override
                   public String apply(Long aLong) throws Exception {
                        Log.e("TAG", "map3--thread=" + Thread.currentThread().getName() + "-aLong:" + aLong);
                        return String.valueOf(aLong);
                    }
                })
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        Log.e("TAG", "Consumer--thread=" + Thread.currentThread().getName() + "-String:" + s);
                    }
                });

如果你了解map这个操作符,那么这个例子你很快就能得运行结果,如果你对于map这个操作符不太清楚,建议回顾下之前的文章拆轮子系列--RxJava理解(一)--Map解析。接下来我们看看本例的程序运行结果:

E/TAG: map1--thread-main-s:1
E/TAG: map2--thread-main-integer:1
E/TAG: map3--thread-RxCachedThreadScheduler-1-aLong:1
E/TAG: Consumer--thread-RxCachedThreadScheduler-1-String:1
E/TAG: map1--thread-main-s:2
E/TAG: map2--thread-main-integer:2
E/TAG: map3--thread-RxCachedThreadScheduler-1-aLong:2
E/TAG: Consumer--thread-RxCachedThreadScheduler-1-String:2

细看下之前的例子,可能有些朋友已经发现了一个异常操作Thread.sleep(1000);。为什么在发射元素的时候睡了一秒钟?这个是为什么呢?哈哈,先不急,下文将一一道来。
从上面运行的结果我们发现,除了observeOn()下面的部分运行在observeOn()指定的线程中,其余的部分运行在subscribeOn()指定的线程,这个是为什么呢?下面再分析,这里先给个结论:RxJava中,observeOn()是用来指定下游observer回调发生的线程。对应上面的例子,也就是map3与Consumer运行的线程。

2. observeOn源码分析

为什么会产生上面的结果?我们来看看源码:

@CheckReturnValue
 @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Observable<T> observeOn(Scheduler scheduler) {
        return observeOn(scheduler, false, bufferSize());
    }

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
    ...
      return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
    }

从源码中我们可以看出,调用observeOn()方法返回了一个Observable对象,而真正的操作是在ObservableObserveOn()这个方法里面,接下来我们看看ObservableObserveOn()这个方法到底干了什么事情:

public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
        super(source);
        this.scheduler = scheduler;
        this.delayError = delayError;
        this.bufferSize = bufferSize;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            Scheduler.Worker w = scheduler.createWorker();
            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }

我们主要看看ObservableObserveOn中主要的实现方法subscribeActual()。在这个方法中,首先创建了一个指定的事物worker,然后将worker作为参数创建了一个ObserveOnObserver对象,接下来我们分析这个ObserveOnObserver中具体的逻辑:

ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
            this.actual = actual;
            this.worker = worker;
            this.delayError = delayError;
            this.bufferSize = bufferSize;
        }

        @Override
        public void onSubscribe(Disposable s) {
            if (DisposableHelper.validate(this.s, s)) {
                this.s = s;
                if (s instanceof QueueDisposable) {
                    @SuppressWarnings("unchecked")
                    QueueDisposable<T> qd = (QueueDisposable<T>) s;
                    int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);

                    if (m == QueueDisposable.SYNC) {
                        sourceMode = m;
                        queue = qd;
                        done = true;
                        actual.onSubscribe(this);
                        schedule();
                        return;
                    }
                    if (m == QueueDisposable.ASYNC) {
                        sourceMode = m;
                        queue = qd;
                        actual.onSubscribe(this);
                        return;
                    }
                }
                queue = new SpscLinkedArrayQueue<T>(bufferSize);
                actual.onSubscribe(this);
            }
        }

        @Override
        public void onNext(T t) {
            if (done) {
                return;
            }

            if (sourceMode != QueueDisposable.ASYNC) {
                queue.offer(t);
            }
            schedule();
        }
        ...

ObserveOnObserver实现了Observer这个接口,重写了Observer里面的方法,我们看看主要的方法onNext()。在该方法中,首先会向queue()中添加元素,我们主要关注schedule()这个方法,进入schedule()

void schedule() {
       if (getAndIncrement() == 0) {
             worker.schedule(this);
       }
  }

上述方法将实现了Runnable接口的ObserveOnObserver对象放入了worker里面进行操作,直白的说,就是该ObserveOnObserver对象的操作会被放入一个线程池中,寻找合适的线程运行。
主要的问题来了,当ObserveOnObserver对象寻找到一条线程后执行了什么操作呢?继续看源码:

 @Override
   public void run() {
            if (outputFused) {
                drainFused();
            } else {
                drainNormal();
            }
        }

        //我们主要看看drainNormal()这个方法:
        void drainNormal() {
            int missed = 1;

            final SimpleQueue<T> q = queue;
            final Observer<? super T> a = actual;

            for (;;) {
                if (checkTerminated(done, q.isEmpty(), a)) {
                    return;
                }

                for (;;) {
                    boolean d = done;
                    T v;
                    try {
                        v = q.poll();
                    } catch (Throwable ex) {
                        Exceptions.throwIfFatal(ex);
                        s.dispose();
                        q.clear();
                        a.onError(ex);
                        return;
                    }
                    boolean empty = v == null;
                    if (checkTerminated(d, empty, a)) {
                        return;
                    }
                    if (empty) {
                        break;
                    }
                    a.onNext(v);
                }

                missed = addAndGet(-missed);
                if (missed == 0) {
                    break;
                }
            }
        }

其实这个方法就是一个死循环,它不断的从queue取出元素然后交给由下一级传递上来的observer来执行onNext()方法。而这整个从queue中取元素到由下级的observer执行onNext()方法,都是执行在scheduler( Scheduler.Worker w = scheduler.createWorker();)所指定的线程中。总的来说,ObserveOnObserver会将下一级传递过来的observer进行封装,让它独立的运行在scheduler指定的线程中去处理元素。
再回到前面的例子,我们在observeOn()操作符后面接着使用了一个map()操作符,那么此时的流程又是怎么样的呢?我们以一张图来进行说明:

observerOn执行流程.png

从上图中可以看到,observeOn后面跟了一个map(),那么在drainNormal ()方法中a.onNext(v)a就是经过map转换过的observer,接着调用mapo.onNext(transformer.call(t)),此时保证了transformer.call()方法运行在observeOn()所指定的线程中,而o就是observer2

3. 总结

使用observeOn()这个操作符,会在原来Observer发射元素的时候,将元素一个个的添加到一个指定的队列中,然后异步(使用一个新的线程)的从该队列中取出元素,将取出的元素交给下一级的observeronNext()方法来处理元素。

回到前面抛出的一个问题,我们在发射元素的时候sleep了1秒钟,这个是为什么呢?说明一下:因为我们取元素的过程是异步操作的,那么很有可能出现某个线程的转换执行完毕之后才执行另一个线程的转换操作,最后与我们期望的结果不太一样。当我们去掉例子中sleep()操作,其结果如下:

E/TAG: map1--thread=main-s:1
E/TAG: map2--thread=main-integer:1
E/TAG: map1--thread=main-s:2
E/TAG: map2--thread=main-integer:2
E/TAG: map3--thread=RxCachedThreadScheduler-1-aLong:1
E/TAG: Consumer--thread=RxCachedThreadScheduler-1-String:1
E/TAG: map3--thread=RxCachedThreadScheduler-1-aLong:2
E/TAG: Consumer--thread=RxCachedThreadScheduler-1-String:2

好了,关于RxJava中线程调度的核心方法observeOn操作符已经介绍完毕。

如果文章中有什么疏漏或者错误的地方,还望各位指正,你们的监督是我最大的动力,谢谢!

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,732评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,496评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,264评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,807评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,806评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,675评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,029评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,683评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,704评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,666评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,773评论 1 332
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,413评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,016评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,978评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,204评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,083评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,503评论 2 343