Rxjava加载流程浅析

基本用法

         Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {

            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(123);
                emitter.onComplete();
            }
        });
        Observer<Integer> observer = new Observer<Integer>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {
                LogUtils.d("onSubscribe");
            }

            @Override
            public void onNext(@NonNull Integer o) {
                System.out.println(o);
            }

            @Override
            public void onError(@NonNull Throwable e) {
                System.out.println(e.getMessage());
            }

            @Override
            public void onComplete() {
                LogUtils.d("onComplete");
            }
        };
        observable.subscribe(observer); 

我们以这段简单的代码为基础,讲解下贯穿整个ReactiveX设计的四个概念:观察者被观察者事件订阅

  • 观察者
    对事件进行响应的对象,也可以称作消费者,在上述的代码中,subscirbe方法有很多个重载,其参数要么是一个Consumer对象,要么是一个Observer对象,Consumer对象后续也会被包装成一个LambdaObserver对象。因此可以说subscribe方法的参数即为Observer(观察者)。
  • 被观察者
    产生事件的对象,也可以称作生产者,在上述代码中,Observable.create(...)返回的是一个Observable对象,即为这段程序的被观察者(生产者)。
  • 事件
    RxJava中存在四种事件流:onSubscribe(订阅事件),onNext(正常事件),onError(异常事件),onComplete(完成事件)。在上述代码中,是将一个整型元素作为onNext事件中的数据进行发送。
  • 订阅
    创建观察者与被观察者之间观察关系,对应着上述代码中的subscribe()方法。RxJava的事件驱动模型是一种“拉模型”,在观察者没有进行事件订阅之前是不会有事件产生的,只有观察者进行订阅后,才会触发被观察者生产事件。

集大成者Observable

在整个数据处理的过程中,Observable可以说是最重要的一个对象。客户端(消息的生产者或者消费者)只和Observable进行交互,观察者和被观察者之间关系的创建也是由Observable去实现,而不用我们显示的编码实现,这大大降低了我们使用观察者模式的成本。

image

从图中我们可以看出:

  • Observable实现了ObservableSource接口,该接口中只有一个方法:subscribe()。从字面意思就可以理解,这是一个提供观察能力的接口,所以Observable的一大能力是供观察者进行事件订阅,而进行事件订阅的方法实现就是调用Observable的subscribe()方法

  • Observable是一个抽象类,它提供了subscribeActual模板方法供子类实现,从源码中可以看出,Observable的subscribe()方法最终会委托子类的subscribeActual()方法实现,这个方法会建立生产者与消费者之间的关联关系。

  • 除此之外,Observable还是一个工厂类,它提供了静态方法fromArray()create()等用来创建具体的可观察对象,同时还提供了flatMap()concatMap()等操作方法对可观察对象进行包装。

Observable的存在让生产者和消费者完全的解耦了,生产者只需关注自己生成何种Observable对象,而消费者也只需关注自己观察的是哪种Observable。

在实际的应用中,Rxjava已经提供了各种各样的操作符供我们使用,生产者只需要调用Observable中相应的方法即可以生成所需的可观察对象,供消费者进行事件订阅。消费者只需调用可观察对象的subscribe()方法即可与生产者建立观察关系,极其方便。

真实的观察

观察者模式是RxJava设计的核心思想,在观察者模式中总是存在观察的对象和被观察的对象,从上文的解析中也可以看出Observable更多的是一个控制器的作用,而并非真正的事件的来源。那么在RxJava中,什么才是真正的生产者,什么才是真正的消费者呢。

我们来分析下以下三种常见的Observable:

Observable arrayObservable = Observable.fromArray(1,2,3,4,5);
Observable createObservable = Observable.create(emmit->emmit.onNext(1));
Observable justObservable = Observable.just(1);

先简单介绍下这几个Observable的作用,fromArray的作用是将数组中的元素作为onNext事件发送,create的作用是发送自定义事件,just的作用是发送单个事件。

上一小节有讲到实际的订阅行为是由各个Observable类中subscribeActual()方法实现的,我们来看下这三个类的subscribeActual()方法。


image

除去细枝末节,这三个方法都可以分成以下三步

1.创建被观察者对象,并传入观察者observer,建立两者的关联关系;
2.触发onSubscribe事件,观察者响应该事件;
3.进行事件的拉取,我们可以进入到d.run()source.subscribe(parent)sd.run()这些方法的内部看一些,可以看到这些方法就是在发送onNext(),onError(),onComplete()等事件。

下图是整个流程中的相关类图。实际事件的发送者是FromArrayDisposable等对象,而实际的观察者,则是一个实现了Observer接口的实体类。如果我们在subscribe时传入的是一个lambda表达式,之后会被包装成一个默认的LambdaObserver对象,进行事件消费。

image

包装

RxJava中提供了丰富的操作符,比如flatMap,concatMap等可以对事件转换,subscribeOn,observableOn等可以对生产和消费的线程进行控制。这些操作符实际上调用了Observable中的包装方法对原有的可观察对象进行包装,返回了一个增强了的可观察对象。

操作符种类繁多,在这就不一一举例,我们以flatMap为例,分析一下这些操作符是如何工作的。

首先,flatMap操作会返回一个ObservableFlatMap对象,在创建这个对象时,会将原始的Observable对象作为构造函数的参数传入。

查看其核心方法subscribeActual,

@Override
    public void subscribeActual(Observer<? super U> t) {

        if (ObservableScalarXMap.tryScalarXMapSubscribe(source, t, mapper)) {
            return;
        }

        source.subscribe(new MergeObserver<T, U>(t, mapper, delayErrors, maxConcurrency, bufferSize));
    }

可以看到这一类对象的subscribeActual方法和上一节中的方法不太一样,这里面并没有去实际的创建观察关系,而是做了两件事:

1.对观察者进行增强,将其包装成为MergeObserver对象
2.再调用source的subscribe方法,这里source就是前面构造函数中传入的Observable对象,由其再进行观察关系的建立。

下图是RxJava中装饰器模式的相关类图:所有的包装类都继承了AbstractObservableWithUpstream类,该抽象类有一个类型为ObservableSource的成员函数,用来持有被装饰的对象。


image

Observable是支持链式操作的,就和Java 8中的Stream一样,我们来考虑这样一行代码。

        Observable.fromArray(1,2,3,4,5).flatMap(num->Observable.just(num)).observeOn(Schedulers.newThread()).subscribe(num-> System.out.println(num));

我们在分析上面这串代码时,一定会凌乱非常,在看源码时也会看到前面忘掉后面,但是如果我们对RxJava的包装流程足够了解的话,就可以很轻松的对上述代码进行分析。


image

流程浅析

我们以一段常见的代码分析事件产生和消费的流程

 Observable.create(new ObservableOnSubscribe<Integer>() {

            @Override
            public void subscribe(@io.reactivex.annotations.NonNull ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
            }
        }).subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(@io.reactivex.annotations.NonNull Disposable d) {
System.out.println("onSubscribe");
            }

            @Override
            public void onNext(@io.reactivex.annotations.NonNull Integer integer) {
                System.out.println(integer);
            }

            @Override
            public void onError(@io.reactivex.annotations.NonNull Throwable e) {
            }

            @Override
            public void onComplete() {
            }
        });

在分析之前,我们先定义几个概念,source代表着被观察者;downStream代表着观察者;它们在下面的代码将会重复出现。

Observable.create(ObservableOnSubscribe source)

Observable.create(ObservableOnSubscribe source)创建了一个ObservableCreate(ObservableOnSubscribe<T> source)对象,其内部持有了真实的被观察者ObservableOnSubscribe,我们将这个ObservableOnSubscribe称为原始被观察者,它是一个接口,内部定义了subscribe()方法:

public interface ObservableOnSubscribe<T> {

    /**
     * Called for each Observer that subscribes.
     * @param emitter the safe emitter instance, never null
     * @throws Exception on error
     */
    void subscribe(@NonNull ObservableEmitter<T> emitter) throws Exception;
}

小结:

ObservableCreate持有的sourceObservableOnSubscribe

ObservableCreate.subscribeOn(Schedulers.IO)

接着我们调用了ObservableCreate对象的subscribeOn,并传入了IO线程调度器,subscribeOn()方法的实现在Observable类,ObservableCreate继承了Observable类

#Observable
public final Observable<T> subscribeOn(Scheduler scheduler) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }

其实就是创建了ObservableSubscribeOn对象,并传入了ObservableCreate对象和IO线程调度器。查看ObservableSubscribeOn的构造函数:

public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source);
        this.scheduler = scheduler;
    }

可以看到ObservableSubscribeOn把source传给了父类,
ObservableSubscribeOn继承了AbstractObservableWithUpstream,因此AbstractObservableWithUpstream持有了ObservableCreate对象,但我们可以理解为ObservableSubscribeOn持有了ObservableCreate对象。

小结:

ObservableSubscribeOn持有了sourceObservableCreate

ObservableSubscribeOn.observerOn(AndroidSchedulers.mainThread())

observerOn()方法的实现同样是在Observable类中:

public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");
        return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
    }

可以看到也只是创建了一个ObservableObserveOn对象,并传入了ObservableObserveOn对象和主线程调度器,这里的delayErrorbufferSize传入的是默认值,暂且不表,查看ObservableObserveOn的构造函数:

public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
        super(source);
        this.scheduler = scheduler;
        this.delayError = delayError;
        this.bufferSize = bufferSize;
    }

这里同样的是把source传给了父类,ObservableObserveOn同样也是继承了AbstractObservableWithUpstream

小结:

ObservableObserveOn持有了sourceObservableSubscribeOn

ObservableObserveOn<T>.subscribe(observer)

接着我们调用了ObservableObserveOn对象的subscribe()方法,并传入了observer,该observer是最外层的Observer<T>的实现类:

public interface Observer<T> {
    void onSubscribe(@NonNull Disposable d);
    void onNext(@NonNull T t);
    void onError(@NonNull Throwable e);
    void onComplete();
}

subscribe(observer)方法的实现是在Observable类中,其内部调用了subscribeActual(observer),subscribeActual(observer)是抽象方法,具体实现是在子类中,查看ObservableObserveOn类的subscribeActual(observer)

# ObservableObserveOn
@Override
    protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            Scheduler.Worker w = scheduler.createWorker();

            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }

1.创建一个任务,scheduler为AndroidSchedulers(主线程);

createWorker()的实现是在HandlerScheduler中,该方法创建了一个HandlerWorker对象,它ji'c持有一个主线程关联的handler(new Handler(Looper.getMainLooper()));

2.创建一个Observer观察者 => ObserveOnObserver,持有一个传进来的observer和worker,传进来的observer是最开始最外面的Observer<T> 接口实现类,woker是HandlerWorker对象。

 ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
            this.downstream = actual;
            this.worker = worker;
            this.delayError = delayError;
            this.bufferSize = bufferSize;
        }

3.为被观察者和观察者之间建立订阅关系

这里的sourceObservableSubscribeOn

小结:

ObserveOnObserver持有了observerinterface Observer<T>

ObservableSubscribeOn.subscribe(observer)

接着查看ObservableSubscribeOnsubscribeActual(observer)方法,源码如下:

# ObservableSubscribeOn.java
@Override
    public void subscribeActual(final Observer<? super T> observer) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);

        observer.onSubscribe(parent);

        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }

1.创建一个Observer观察者 =>SubscribeOnObserver<T>

SubscribeOnObserver<T>持有一个传进来的observer,这个传进来的observer是我们上一步创建的ObserveOnObserver对象

2.调用ObserveOnObserver.onSubscribe(parent)

查看源码:

# ObserveOnObserver.java
@Override
        public void onSubscribe(Disposable d) {
              // ...
                downstream.onSubscribe(this);
            }
        }

这里的downstream下游即我们传进来的观察者observer,也就是ObserveOnObserver所持有的interface Observer<T>的实现了,也就是最外层的代码:

@Override
            public void onSubscribe(@io.reactivex.annotations.NonNull Disposable d) {
                System.out.println("onSubscribe");
            }
  1. scheduler.scheduleDirect(new SubscribeTask(parent))其实就是执行线程,scheduler就是我们传进来的线程调度器Schedulers.IO,
    SubscribeTask实现了Runnable,查看它的run方法:
final class SubscribeTask implements Runnable {
        private final SubscribeOnObserver<T> parent;

        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }

        @Override
        public void run() {
            source.subscribe(parent);
        }
    }

它的run方法只有一句代码:source.subscribe(parent);parent是SubscribeOnObserver对象,source是传递给 ObservableSubscribeOn的source,也就是最开始的ObservableCreate对象。因此实际上是调用了ObservableCreatesubscribe()方法,并传入了SubscribeOnObserver对象;

小结

SubscribeOnObserver持有了observerObserveOnObserver
onSubscribe方法是最先执行的方法,并且它的线程和当前线程一样,因为此时还没有执行线程切换。
ObservableCreate.subscribeActual(observer)方法是运行在子线程的。

ObservableCreate.subscribe(observer)

查看ObservableCreatesubscribeActual方法源码如下:

# ObservableCreate.java
@Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);
       source.subscribe(parent);
    }
  1. 创建了一个事件发射器CreateEmitter,并传入了observer;

这里的observerSubscribeOnObserver对象

2.SubscribeOnObserveronSubscribe()方法只是为了设置Dispoable的值,不影响流程;
3.为被观察者和观察者之间建立订阅关系

这里的sourceObservableOnSubscribe

这里开始执行,外层页面代码的subscribe部分,开始发射数据

@Override
 public void subscribe(@io.reactivex.annotations.NonNull ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
 }

小结

emitter 发射器发送数据是在子线程。

CreateEmitter.onNext()

上一步的emitter是CreateEmitter<T>,createEmitter的onNext()源码如下:

# CreateEmitter.java
public void onNext(T t) {
            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }

这里的observerSubscribeOnObserver,查看SubscribeOnObserver的onNext()

# SubscribeOnObserver.java
final Observer<? super T> downstream;
SubscribeOnObserver(Observer<? super T> downstream) {
            this.downstream = downstream;
            this.upstream = new AtomicReference<Disposable>();
        }
@Override
        public void onNext(T t) {
            downstream.onNext(t);
        }

这里的downStreamSubscribeOnObserver的构造函数传进来,SubscribeOnObserver是在ObservableSubscribeOn.subscribeActual创建的,downStream就是ObserveOnObserver对象,查看ObserveOnObserver的onNext:

#ObserveOnObserver.java
 @Override
        public void onNext(T t) {
            if (done) {
                return;
            }

            if (sourceMode != QueueDisposable.ASYNC) {
                queue.offer(t);
            }
            schedule();
        }

void schedule() {
            if (getAndIncrement() == 0) {
                worker.schedule(this);
            }
        }

最终是调用了worker执行任务,ObserveOnObserver本身实现了Runnable,它的run()方法中调用了ObserveOnObserver所持有的observeronNext方法,ObservableObserveOn所持有的正是最外层的Observer<T> 接口实现类,因此最后执行了

@Override
            public void onNext(@io.reactivex.annotations.NonNull Integer integer) {
                System.out.println(integer);
            }

小结

worker.schedule(this);这里又切换了线程到主线程,worker正是之前AndroidThread创建的HandlerWorker,其内部持有主线程关联的handler,因此最后的onNext是在主线程执行。
b784f4ad31217d419faedc0d0efc595f.png

只需要理解,每次 observerOn 和 subscribeOn 的时候,内部都会创建一个新的 observable 和 observer。

新创建的 observable 会引用前面的 observable,就是代码中我们分析的 source 变量。
新创建的 observer 会引用前面的 observer,就是代码中我们分析的 observer 变量。

最后我们 subscribe 的时候,是调用的最后创建的 observable 的方法。而每个 observable 内部又调用了 source 的 subscribe 方法,这样就形成了一层一层往前传递的调用链。当调用到最前面的一个 observable 的时候,就是我们自己创建的 observable,在这里我们需要手动触发与该 observable 对应的 observer 对象的 onNext 方法。而 observer 的 onNext 方法的内部又调用了 downstream 的 onNext 方法,这样就形成了一层一层往后传递的调用链。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,001评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,210评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,874评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,001评论 1 291
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,022评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,005评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,929评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,742评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,193评论 1 309
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,427评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,583评论 1 346
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,305评论 5 342
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,911评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,564评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,731评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,581评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,478评论 2 352

推荐阅读更多精彩内容