RxJava 2.x入门新姿势一

引言

经过几年的发展,响应式编程已经是很流行了,在Android开发中的应用也非常的广泛,身为Android开发者,则是必须掌握的技术。

正文

网上已经有很多很多RxJava相关的文章,视频等等教程,但是说实话对于入门,或者新手来说,确实不好理解,上来就是各种,观察者、被观察者、订阅、发布等等概念,一遍看下来直接就晕了,就感觉RxJava很难,难理解,用的时候也只是依葫芦画瓢,晕乎乎的用着,然后就没有然后了。

这里我都不说那些概念,因为讲概念太抽象,难记住,更难理解。我们用另外一个视角来学习。因为RxJava 1.x的版本 官方已经停止更新了维护了,没有学习过也没有关系,RxJava 2.x是全新的,直接学习使用就好了。

首先假设我们在工厂里上班,工厂都会有流水线,产品经过流水线生产后来订单了销售出去。


事件处理模型

这里假设工厂生产的是一种六边形的“Jerry帅气饼干”,上游是生产车间流水线的事件流,下游是订单产品的销售消费事件流。中间连接上下游关系的暂且叫做“Jerry帅气饼干生产消费订单管理系统”(不要脸,名字写这么长),为了下文方便抒写且用“生产订单管理系统”(PCMS)。以上图上下游对应的就是Obsersvable被观察者也是发布者,下游对应Observer观察者也是订阅者。使用RxJava代码表示上图就是:

public void test1() {

        // 上游生成产品流水线
        Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                Log.d(TAG, "test1 ====== Observable: ------ onNext: Jerry");
                emitter.onNext("Jerry");
                Log.d(TAG, "test1 ====== Observable: ------ onNext: 就是");
                emitter.onNext("就是");
                Log.d(TAG, "test1 ====== Observable: ------ onNext: 帅");
                emitter.onNext("帅");
                Log.d(TAG, "test1 ====== Observable: ------ onNext: !!!");
                emitter.onNext("!!!");
                Log.d(TAG, "test1 ====== Observable: ------ onComplete");
                emitter.onComplete();
                Log.d(TAG, "test1 ====== Observable: ------ onNext: Jerry帅炸天!!!");
                emitter.onNext("Jerry帅炸天!!!");
            }
        });

        // 下游订单产品销售
        Observer<String> observer = new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "test1 ====== Observer: onSubscribe");
            }

            @Override
            public void onNext(String value) {
                Log.d(TAG, "test1 ====== Observer: onNext: " + value);
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "test1 ====== Observer: onError: " + e.getMessage());
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "test1 ====== Observer: onComplete");
            }
        };

        // 连接上下游的订单管理系统
        observable.subscribe(observer);
    }

上述代码,上游生产车间流水线就是Observable,下游订单销售就是Observer,中间通过“生产订单管理系统”subscribe来将上下游连接起来。

运行后输出结果是:


输出结果

从输出结果来看,当上游Observable发出一个生产的饼干产品事件,下游订单销售的Observer就销售一个饼干产品事件,而且当上游调用了onComplete方法后,上游的生产事件还是生产饼干事件(继续生产了“Jerry帅炸天”饼干事件),但是下游的订单销售却没有消费掉。也就是事件产生方调用onComplete方法后,之后的事件还会继续发送,但是事件接收方就不会接收了。

我们来看看Observable的subscribe方法的参数:ObservableEmitter,Emitter顾名思义是发射器的意思,ObservableEmitter接口继承自Emitter接口:

public interface Emitter<T> {

    /**
     * Signal a normal value.
     * @param value the value to signal, not null
     */
    void onNext(T value);

    /**
     * Signal a Throwable exception.
     * @param error the Throwable to signal, not null
     */
    void onError(Throwable error);

    /**
     * Signal a completion.
     */
    void onComplete();
}

接口定义很简单,就三个方法,onNext我们上门已经用过了,是用来发射发送事件的,onComplete是用来表示事件发送完了,后面如果有新的事件发送,下游接收者可以不用处理,onError方法看注释说是发送一个异常事件给下游接收者。到底是不是这样,我们来试试就晓得了。

public void test2() {

        // 上游生成产品流水线
        Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                Log.d(TAG, "test2 ====== Observable: ------ onNext: Jerry");
                emitter.onNext("Jerry");
                Log.d(TAG, "test2 ====== Observable: ------ onNext: 就是");
                emitter.onNext("就是");
                Log.d(TAG, "test2 ====== Observable: ------ onNext: 帅");
                emitter.onNext("帅");
                Log.d(TAG, "test2 ====== Observable: ------ onNext: !!!");
                emitter.onNext("!!!");
                Log.d(TAG, "test2 ====== Observable: ------ onError");
                emitter.onError(new IllegalStateException("Jerry饼干烤焦了,卖出去会被打!"));
                Log.d(TAG, "test2 ====== Observable: ------ onNext: Jerry帅炸天!!!");
                emitter.onNext("Jerry帅炸天!!!");
            }
        });

        // 下游订单产品销售
        Observer<String> observer = new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "test2 ====== Observer: onSubscribe");
            }

            @Override
            public void onNext(String value) {
                Log.d(TAG, "test2 ====== Observer: onNext: " + value);
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "test2 ====== Observer: onError: " + e.getMessage());
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "test2 ====== Observer: onComplete");
            }
        };

        // 连接上下游的订单管理系统
        observable.subscribe(observer);
    }

在上游生产饼干的时候就生产了一个“Jerry饼干烤焦了,卖出去会被打!”的错误饼干事件,下游订单销售的onError出错状态会消费这个事件。而上游在出错事件后发送的“Jerry帅炸天!!!”饼干事件,同样也只是把事件发送了处理,下游订单销售并没有接收处理这个事件。

运行后输出结果:


输出结果

细心的小伙伴应该会发现,每次执行的时候都会先调用下游的onSubscribe方法,这个方法里有个参数Disposable(用完即可丢弃)意思可以理解成,将上下游的连接切断,让上游的生产的饼干不打包放入下游订单销售环节,实际开发中是有这种需求的,当发送事件出问题的时候就需要断开事件接收处理。不像最近的疫苗事件,一些不要脸的生物疫苗公司把生产不合格的疫苗上市销售,伤天害理,谋财害命。下面举个例子:

public void test3() {

        // 上游生成产品流水线
        Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                Log.d(TAG, "test3 ====== Observable: ------ onNext: Jerry");
                emitter.onNext("Jerry");
                Log.d(TAG, "test3 ====== Observable: ------ onNext: 就是");
                emitter.onNext("就是");
                Log.d(TAG, "test3 ====== Observable: ------ onNext: 帅");
                emitter.onNext("帅");
                Log.d(TAG, "test3 ====== Observable: ------ onNext: !!!");
                emitter.onNext("!!!");
                Log.d(TAG, "test3 ====== Observable: ------ onComplete");
                emitter.onComplete();
                Log.d(TAG, "test3 ====== Observable: ------ onNext: Jerry帅炸天!!!");
                emitter.onNext("Jerry帅炸天!!!");
            }
        });

        // 下游订单产品销售
        Observer<String> observer = new Observer<String>() {

            private Disposable mDisposable;
            private int i;

            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "test3 ====== Observer: onSubscribe");
                mDisposable = d;
            }

            @Override
            public void onNext(String value) {
                Log.d(TAG, "test3 ====== Observer: onNext: " + value);
                i++;
                // 第一个事件接收后,就断开上下游连接
                if (i == 1) {
                    Log.d(TAG, "test3 ====== Observer: start disposable");
                    mDisposable.dispose();
                    Log.d(TAG, "test3 ====== Observer: isDisposable: " + mDisposable.isDisposed());
                }
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "test3 ====== Observer: onError: " + e.getMessage());
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "test3 ====== Observer: onComplete");
            }
        };

        // 连接上下游的订单管理系统
        observable.subscribe(observer);
    }

这里我们在下游订单销售的onNext方法中,当接收完第一个饼干事件后,就使用mDisposable.dispose()方法将上下游的连接断开了,断开后上游后续生产的饼干事件,下游就接收不到。

运行的结果:


image.png

上图中,也验证了我们的猜想,当使用dispose断开上下游连接后,下游就无法再继续接收事件了。


这一讲就先介绍这么多,这样的方式理解Observable和Observer以及订阅动作subscribe是不是容易多了,希望对你有所帮助,下一讲使用RxJava来切换变化饼干事件处理的线程(主线程、子线程)。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,658评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,482评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,213评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,395评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,487评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,523评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,525评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,300评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,753评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,048评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,223评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,905评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,541评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,168评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,417评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,094评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,088评论 2 352

推荐阅读更多精彩内容

  • 怎么如此平静, 感觉像是走错了片场.为什么呢, 因为上下游工作在同一个线程呀骚年们! 这个时候上游每次调用emit...
    Young1657阅读 1,462评论 2 1
  • 转载自:https://xiaobailong24.me/2017/03/18/Android-RxJava2.x...
    Young1657阅读 2,018评论 1 9
  • 引入依赖: implementation 'io.reactivex.rxjava2:rxandroid:2.0....
    为梦想战斗阅读 1,300评论 0 0
  • 观察者模式的运用 传统的Java观察者模式可以参考此篇博客:Java观察者模式案例简析 RxJava 是基于Jav...
    正规程序员阅读 2,632评论 0 52
  • 学习写作良久,断断续续地也写了些文章。每当灵感来临之际,总会提笔写上几文。一次,把文章拿给精于写作的老师看,他认认...
    熊一一阅读 1,524评论 0 1