RxJava 的消息订阅

1 简单使用步骤

1、创建被观察者(Observable),定义要发送的事件。
2、创建观察者(Observer),接受事件并做出响应操作。
3、观察者通过订阅(subscribe)被观察者把它们连接到一起。

2 RxJava的消息订阅例子

  //步骤1. 创建被观察者(Observable),定义要发送的事件。
        Observable observable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                emitter.onNext("文章1");
                emitter.onNext("文章2");
                emitter.onNext("文章3");
                emitter.onComplete();
            }
        });

        //步骤2. 创建观察者(Observer),接受事件并做出响应操作。
        Observer<String> observer = new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "onSubscribe");
            }

            @Override
            public void onNext(String s) {
                Log.d(TAG, "onNext : " + s);
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError : " + e.toString());
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete");
            }
        };

        //步骤3. 观察者通过订阅(subscribe)被观察者把它们连接到一起。
        observable.subscribe(observer);

其输出结果为:

onSubscribe
onNext : 文章1
onNext : 文章2
onNext : 文章3
onComplete

3 源码分析

3.1 创建被观察者过程

首先来看下创建被观察者(Observable)的过程,上面的例子中我们是直接使用Observable.create()来创建Observable

3.1.1 Observable类的create()

创建一个ObservableCreate对象出来,然后把我们自定义的ObservableOnSubscribe作为参数传到ObservableCreate中去,最后就是调用 RxJavaPlugins.onAssembly()方法。

3.1.2 ObservableCreate类

public final class ObservableCreate<T> extends Observable<T> {//继承自Observable
    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;//把我们创建的ObservableOnSubscribe对象赋值给source。
    }
}

可以看到,ObservableCreate是继承自Observable的,并且会把ObservableOnSubscribe对象给存起来

3.1.3 RxJavaPlugins类的onAssembly()

    public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
        //省略无关代码
        return source;
    }

很简单,就是把上面创建的ObservableCreate给返回。

3.1.4 简单总结

Observable.create()中就是把我们自定义的ObservableOnSubscribe对象重新包装成一个ObservableCreate对象,然后返回这个ObservableCreate对象

3.1.5

Observable.create()的时序图如下所示:


3.2 订阅过程

3.2.1 Observable类的subscribe()

    public final void subscribe(Observer<? super T> observer) {
            //省略无关代码

            observer = RxJavaPlugins.onSubscribe(this, observer);

            subscribeActual(observer);

            //省略无关代码
    }

可以看到,实际上其核心的代码也就两句,我们分开来看下:

3.2.2 RxJavaPlugins类的onSubscribe()

    public static <T> Observer<? super T> onSubscribe(@NonNull Observable<T> source, @NonNull Observer<? super T> observer) {
        //省略无关代码

        return observer;
    }

跟之前代码一样,这里同样也是把原来的observer返回而已。
再来看下subscribeActual()方法。

3.2.3 Observable类的subscribeActual()

 protected abstract void subscribeActual(Observer<? super T> observer);

Observable类的subscribeActual()中的方法是一个抽象方法,那么其具体实现在哪呢?还记得我们前面创建被观察者的过程吗,最终会返回一个ObservableCreate对象,这个ObservableCreate就是Observable的子类,我们点进去看下:

3.2.4 ObservableCreate类的subscribeActual()

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        //触发我们自定义的Observer的onSubscribe(Disposable)方法
        observer.onSubscribe(parent);

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

可以看到,subscribeActual()方法中首先会创建一个CreateEmitter对象,然后把我们自定义的观察者observer作为参数给传进去。这里同样也是包装起来

这个CreateEmitter实现了ObservableEmitter接口和Disposable接口,如下:

这个CreateEmitter实现了ObservableEmitter接口和Disposable接口,如下:

然后就是调用了observer.onSubscribe(parent),实际上就是调用观察者的onSubscribe()方法,即告诉观察者已经成功订阅到了被观察者。

继续往下看,subscribeActual()方法中会继续调用source.subscribe(parent),这里的source就是ObservableOnSubscribe对象,即这里会调用ObservableOnSubscribe的subscribe()方法。

我们具体定义的subscribe()方法如下:

        Observable observable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                emitter.onNext("文章1");
                emitter.onNext("文章2");
                emitter.onNext("文章3");
                emitter.onComplete();
            }
        });

ObservableEmitter,顾名思义,就是被观察者发射器。
所以,subscribe()里面的三个onNext()方法和一个onComplete()会逐一被调用。

3.2.5 CreateEmitter类的onNext()和onComplete()等

        //省略其他代码

        @Override
        public void onNext(T t) {
            //省略无关代码
            if (!isDisposed()) {
                //调用观察者的onNext()
                observer.onNext(t);
            }
        }

        @Override
        public void onComplete() {
            if (!isDisposed()) {
                try {
                    //调用观察者的onComplete()
                    observer.onComplete();
                } finally {
                    dispose();
                }
            }
        }

可以看到,最终就是会调用到观察者的onNext()和onComplete()方法。

可以看到,上面有个isDisposed()方法能控制消息的走向,即能够切断消息的传递,这个后面再来说。

3.2.6 简单总结

Observable(被观察者)和Observer(观察者)建立连接(订阅)之后,会创建出一个发射器CreateEmitter,发射器会把被观察者中产生的事件发送到观察者中去,观察者对发射器中发出的事件做出响应处理。

可以看到,是订阅之后,Observable(被观察者)才会开始发送事件

3.2.7 时序流程图

再来看下订阅过程的时序流程图:


4 切断消息

4.1 切断消息

        Observable observable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                emitter.onNext("文章1");
                emitter.onNext("文章2");
                emitter.onNext("文章3");
                emitter.onComplete();
            }
        });

        Observer<String> observer = new Observer<String>() {
            private Disposable mDisposable;
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "onSubscribe : " + d);
                mDisposable=d;
            }

            @Override
            public void onNext(String s) {
                Log.d(TAG, "onNext : " + s);
                mDisposable.dispose();
                Log.d(TAG, "切断观察者与被观察者的连接");
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError : " + e.toString());
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete");
            }
        };

        observable.subscribe(observer);

输出结果为

onSubscribe : null
onNext : 文章1
切断观察者与被观察者的连接

可以看到,要切断消息的传递很简单,调用下Disposable的dispose()方法即可。调用dispose()之后,被观察者虽然能继续发送消息,但是观察者却收不到消息了
另外有一点需要注意,上面onSubscribe输出的Disposable值是"null",并不是空引用null。

4.2 切断消息源码分析

Disposable是一个接口,可以理解Disposable为一个连接器,调用dispose()后,这个连接器将会中断。

其具体实现在CreateEmitter类.

4.2.1 CreateEmitter的dispose()

        @Override
        public void dispose() {
            DisposableHelper.dispose(this);
        }

就是调用DisposableHelper.dispose(this)而已。

4.2.2 DisposableHelper类

public enum DisposableHelper implements Disposable {

    DISPOSED
    ;

    //其他代码省略

    public static boolean isDisposed(Disposable d) {
        //判断Disposable类型的变量的引用是否等于DISPOSED
        //即判断该连接器是否被中断
        return d == DISPOSED;
    }

    public static boolean dispose(AtomicReference<Disposable> field) {
        Disposable current = field.get();
        Disposable d = DISPOSED;
        if (current != d) {
            //这里会把field给设为DISPOSED
            current = field.getAndSet(d);
            if (current != d) {
                if (current != null) {
                    current.dispose();
                }
                return true;
            }
        }
        return false;
    }
}

可以看到DisposableHelper是一个枚举类,并且只有一个值:DISPOSED。dispose()方法中会把一个原子引用field设为DISPOSED,即标记为中断状态

因此后面通过isDisposed()方法即可以判断连接器是否被中断。

4.2.3 CreateEmitter类中的方法

        @Override
        public void onNext(T t) {
            //省略无关代码

            if (!isDisposed()) {
                //如果没有dispose(),才会调用onNext()
                observer.onNext(t);
            }
        }

        @Override
        public void onError(Throwable t) {
            if (!tryOnError(t)) {
                //如果dispose()了,会调用到这里,即最终会崩溃
                RxJavaPlugins.onError(t);
            }
        }

        @Override
        public boolean tryOnError(Throwable t) {
            //省略无关代码
            if (!isDisposed()) {
                try {
                    //如果没有dispose(),才会调用onError()
                    observer.onError(t);
                } finally {
                    //onError()之后会dispose()
                    dispose();
                }
                //如果没有dispose(),返回true
                return true;
            }
            //如果dispose()了,返回false
            return false;
        }

        @Override
        public void onComplete() {
            if (!isDisposed()) {
                try {
                    //如果没有dispose(),才会调用onComplete()
                    observer.onComplete();
                } finally {
                    //onComplete()之后会dispose()
                    dispose();
                }
            }
        }
  1. 如果没有dispose,observer.onNext()才会被调用到。

  2. onError()和onComplete()互斥,只能其中一个被调用到,因为调用了他们的任意一个之后都会调用dispose()。

  3. 先onError()后onComplete(),onComplete()不会被调用到。反过来,则会崩溃,因为onError()中抛出了异常:RxJavaPlugins.onError(t)。实际上是dispose后继续调用onError()都会炸。

参考

详解 RxJava 的消息订阅和线程切换原理

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 225,337评论 6 524
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 96,560评论 3 406
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 172,632评论 0 370
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 61,219评论 1 303
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 70,219评论 6 401
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 53,670评论 1 316
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 42,018评论 3 431
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 41,000评论 0 280
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 47,552评论 1 326
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 39,565评论 3 347
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 41,692评论 1 355
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 37,280评论 5 351
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 43,009评论 3 341
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 33,435评论 0 25
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 34,587评论 1 277
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 50,276评论 3 383
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 46,752评论 2 367

推荐阅读更多精彩内容