RxJava2.0 - 文章一

前言

自己在学习RxJava2.0时,参考了大神的博客,然后在这里做一个笔记为了方便自己以后复习和查看,同时也给需要的小伙伴借鉴,RxJava2.0自己也仿照大神那样,做一个系列的文章,当然肯定没有大神写的那么好、那么完整、那么详细,自己只是做一个小的总结。

RxJava2.0 - 文章一
RxJava2.0 - 文章二
RxJava2.0文章三 - Map和FlatMap操作符的用法
RxJava2.0文章四 - Zip操作符的用法
RxJava2.0文章五 - Backpressure操作符
RxJava2.0文章六 - 解决上游、下游发射事件速度不平衡问题
RxJava2.0 - 文章七
RxJava2.0 - 文章八

1. RxJava基本工作原理


下边通过两根水管代替观察者与被观察者来解释它两个之间的关系,从事件流角度来说明其原理。


RxJava原理图.png

上边水管叫上游,是事件产生的水管;
下边水管叫下游,是事件接收的水管;
把上游和下游通过一定的方式连接,使得上游每产生一个事件,下游都能收到事件;

对应关系如下:
上游对应Observable;
下游对应Observer;
连接是subscribe();

2. 示例代码如下


2.1: 最基本使用
/**
     * 最基本使用
     */
    public static void demo1(){
        // 创建一个上游 Observable
        Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onComplete();
            }
        }) ;

        // 创建一个下游 Observer
        Observer<Integer> observer = new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.e("TAG" , "subscribe") ;
            }

            @Override
            public void onNext(Integer value) {
                Log.e("TAG" , "" + value) ;
            }

            @Override
            public void onError(Throwable e) {
                Log.e("TAG" , "error") ;
            }

            @Override
            public void onComplete() {
                Log.e("TAG" , "complete") ;
            }
        } ;

        // 建立连接
        observable.subscribe(observer);
    }

运行结果如下:

04-26 09:58:34.796 27647-27647/cn.novate.rxjava2 E/TAG: subscribe
04-26 09:58:34.796 27647-27647/cn.novate.rxjava2 E/TAG: 1
04-26 09:58:34.796 27647-27647/cn.novate.rxjava2 E/TAG: 2
04-26 09:58:34.796 27647-27647/cn.novate.rxjava2 E/TAG: 3
04-26 09:58:34.796 27647-27647/cn.novate.rxjava2 E/TAG: complete

注意:只有把上游和下游连接起来,上游才会开始发送事件,即就是只有调用了subscribe()方法之后,上游才会开始发送事件。

2.2: 链式调用把上边代码连接起来
public static void demo2(){
        ObservableAll.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onComplete();
            }
        }).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.e("TAG" , "subscribe") ;
            }

            @Override
            public void onNext(Integer value) {
                Log.e("TAG" , ""+value) ;
            }

            @Override
            public void onError(Throwable e) {
                Log.e("TAG" , "error") ;
            }

            @Override
            public void onComplete() {
                Log.e("TAG" , "complete") ;
            }
        });

        // 
    }

运行结果和demo1()方法一样;

3. 解释ObservableEmitter和Disposable


3.1: ObservableEmitter

Emitter是发射器的意思,用于发射emitter.onNext()、emitter.onError()、emitter.onCompleted()这3个类型的事件,且只能发送这3种事件,不能随意发射,必须满足以下规则:
1>:上游可以发送无限个onNext(),下游也可以接受无限个onNext();
2>:上游发送一个 onCompleted()之后,上游 onCompleted()之后的事件还可以继续发送,而下游一旦 接收到 onCompleted(),就不会继续接收事件;
3>:上游发送一个 onError()之后,上游 onError()之后的事件还可以继续发送,而下游一旦接收到 onError(),就不会继续接收事件;
4>:上游可以不发送 onCompleted()、onError();

3.2: Disposable

Disposable是一次性用品,用完即可丢弃。
对于上边管道问题,可以把Disposable理解为上游和下游的一个开关,一旦调用 Disposable.dispose()方法就会把两个管道切断,导致下游接收不到事件;

注意:调用Disposable.dispose()方法,只会导致下游接收不到事件,但是上游还是可以继续发送事件的

下边写一个示例代码,用于测试,在 调用 Disposable.dispose()方法之后,上游可以继续发送事件,但是下游接收不到事件:

/**
     *  让上游依次发送1、2、3、complete、4,在下游接收到第二个事件后,调用 dispose()切断水管
     */
    public static void demo3(){
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {

                Log.e("TAG" , "emit 1") ;
                emitter.onNext(1);
                Log.e("TAG" , "emit 2") ;
                emitter.onNext(2);
                Log.e("TAG" , "emit 3") ;
                emitter.onNext(3);

                emitter.onComplete();

                Log.e("TAG" , "emit 4") ;
                emitter.onNext(4);
            }
        }).subscribe(new Observer<Integer>() {

            private Disposable mDisposable ;
            private int i ;
            @Override
            public void onSubscribe(Disposable d) {
                Log.e("TAG" , "subscribe") ;
                mDisposable = d ;
            }

            @Override
            public void onNext(Integer value) {
                Log.e("TAG" , "next" + value) ;

                i++ ;
                if (i == 2){
                    Log.e("TAG" , "dispose") ;
                    mDisposable.dispose();
                    Log.e("TAG" , "isDisposed:" + mDisposable.isDisposed()) ;
                }
            }

            @Override
            public void onError(Throwable e) {
                Log.e("TAG" , "error") ;
            }

            @Override
            public void onComplete() {
                Log.e("TAG" , "complete") ;
            }
        });

    }

运行结果如下:

04-26 10:39:25.721 13802-13802/cn.novate.rxjava2 E/TAG: subscribe
04-26 10:39:25.722 13802-13802/cn.novate.rxjava2 E/TAG: emit 1
04-26 10:39:25.722 13802-13802/cn.novate.rxjava2 E/TAG: next1
04-26 10:39:25.722 13802-13802/cn.novate.rxjava2 E/TAG: emit 2
04-26 10:39:25.722 13802-13802/cn.novate.rxjava2 E/TAG: next2
04-26 10:39:25.722 13802-13802/cn.novate.rxjava2 E/TAG: dispose
04-26 10:39:25.722 13802-13802/cn.novate.rxjava2 E/TAG: isDisposed:true
04-26 10:39:25.722 13802-13802/cn.novate.rxjava2 E/TAG: emit 3
04-26 10:39:25.722 13802-13802/cn.novate.rxjava2 E/TAG: complete
04-26 10:39:25.723 13802-13802/cn.novate.rxjava2 E/TAG: emit 4
右上边运行结果可知:

1>:最先调用 onSubscribe()方法;
2>:在 下游的 onNext() 方法接收到 2事件时候,然后调用 Disposable.dispose()方法切断水管,可以看到上游仍然发送了 3、complete、4,这几个事件;
3>:并且上游并没有因为 调用 onComplete()方法而停止发送事件;

4. subscribe()有多个重载方法


public final Disposable subscribe() {}
public final void subscribe(Observer<? super T> observer) {}
public final Disposable subscribe(Consumer<? super T> onNext) {}
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {} 
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) {}
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) {}

1>:第一个是不带任何参数的,表示下游不关心任何事件,上游尽管发送数据;
2>:第二个是带有 Observer参数的上边已经使用过了;
3>:第三个是带有一个 Consumer参数的,表示下游只关心onNext()事件,其他事件不关心,所以说如果只需要onNext()事件,下边用示例代码说明:

/**
     * 带有一个参数的 Consumer参数的,表示只关心 onNext()事件,其余事件不关心
     */
    public static void demo4(){
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                Log.e("TAG" , "emit 1") ;
                emitter.onNext(1);
                Log.e("TAG" , "emit 2") ;
                emitter.onNext(2);
                Log.e("TAG" , "emit 3") ;
                emitter.onNext(3);

                Log.e("TAG" , "complete") ;
                emitter.onComplete();

                Log.e("TAG" , "emit 4") ;
                emitter.onNext(4);
            }
        }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.e("TAG" , "next " + integer) ;
            }
        }) ;
    }

运行结果如下:

04-26 11:06:46.858 4915-4915/cn.novate.rxjava2 E/TAG: emit 1
04-26 11:06:46.859 4915-4915/cn.novate.rxjava2 E/TAG: next 1
04-26 11:06:46.859 4915-4915/cn.novate.rxjava2 E/TAG: emit 2
04-26 11:06:46.859 4915-4915/cn.novate.rxjava2 E/TAG: next 2
04-26 11:06:46.859 4915-4915/cn.novate.rxjava2 E/TAG: emit 3
04-26 11:06:46.859 4915-4915/cn.novate.rxjava2 E/TAG: next 3
04-26 11:06:46.859 4915-4915/cn.novate.rxjava2 E/TAG: complete
04-26 11:06:46.859 4915-4915/cn.novate.rxjava2 E/TAG: emit 4

下边带有2个参数、3个参数、4个参数原理是一样的,就不解释了,自己可以写示例代码来验证。

以上就是教程一的全部内容。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,530评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 86,403评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,120评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,770评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,758评论 5 367
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,649评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,021评论 3 398
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,675评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,931评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,659评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,751评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,410评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,004评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,969评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,203评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,042评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,493评论 2 343

推荐阅读更多精彩内容