手把手教你使用 RxJava 2.0(三)

本节介绍的是关于Flowabale的使用,以及RxJava 2.x中的backpressure的处理策略。这部分内容应当是RxJava 2.x中改动最大的一部分。但同时也能解决一些应用场景中的问题,使得我们的RxJava更加强大。

Flowable的产生

在RxJava中会经常遇到一种情况就是被观察者发送消息十分迅速以至于观察者不能及时的响应这些消息。
例如下面这种情况:

Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                while (true){
                    e.onNext(1);
                }
            }
        })
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Thread.sleep(2000);
                System.out.println(integer);
            }
        });

被观察者是事件的生产者,观察者是事件的消费者。上述例子中可以看出生产者无限生成事件,而消费者每2秒才能消费一个事件,这会造成事件无限堆积,最后造成OOM。

因此问题来了,要怎么处理这些慢慢堆积起来的消息呢?

Flowable就是由此产生,专门用来处理这类问题。
关于上述的问题,有个专有的名词来形容上述现象,即:Backpressure(背压)。所谓背压,即生产者的速度大于消费者的速度带来的问题。
在原来的RxJava 1.x版本中并没有Flowable的存在,Backpressure问题是由Observable来处理的。在RxJava 2.x中对于backpressure的处理进行了改动,为此将原来的Observable拆分成了新的Observable和Flowable,同时其他相关部分也同时进行了拆分。原先的Observable已经不具备背压处理能力。

到此,我们便知道了Flowable是为了应对Backpressure而产生的。Flowable是一个被观察者,与Subscriber(观察者)配合使用,解决Backpressure问题。

下面我们就具体讲解处理Backpressure的策略。
注意:处理Backpressure的策略仅仅是处理Subscriber接收事件的方式,并不影响Flowable发送事件的方法。即使采用了处理Backpressure的策略,Flowable原来以什么样的速度产生事件,现在还是什么样的速度不会变化,主要处理的是Subscriber接收事件的方式。

处理Backpressure的策略

在讲具体策略之前,我们要具体分析下什么情况下才会产生Backpressure问题?
1.如果生产者和消费者在一个线程的情况下,无论生产者的生产速度有多快,每生产一个事件都会通知消费者,等待消费者消费完毕,再生产下一个事件。所以在这种情况下,根本不存在Backpressure问题。即同步情况下,Backpressure问题不存在。
2.如果生产者和消费者不在同一线程的情况下,如果生产者的速度大于消费者的速度,就会产生Backpressure问题。即异步情况下,Backpressure问题才会存在。

现在我们已经知道了具体产生Backpressure问题的原因及场景。那我们就可以通过学习下面处理Backpressure问题的策略来解决问题了。
以下实例都是异步操作

ERROR

这种方式会在产生Backpressure问题的时候直接抛出一个异常,这个异常就是著名的MissingBackpressureException。

我们先以代码示例介绍一下Flowable相比与Observable新的东西。

Flowable<Integer> flowable = Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                Log.d(TAG, "emit 1");
                emitter.onNext(1);
                Log.d(TAG, "emit 2");
                emitter.onNext(2);
                Log.d(TAG, "emit 3");
                emitter.onNext(3);
                Log.d(TAG, "emit complete");
                emitter.onComplete();
            }
        }, BackpressureStrategy.ERROR); //增加了一个参数

        Subscriber<Integer> subscriber = new Subscriber<Integer>() {
            @Override
            public void onSubscribe(Subscription s) {
                Log.d(TAG, "onSubscribe");
                s.request(Long.MAX_VALUE);  
            }
            @Override
            public void onNext(Integer integer) {
                Log.d(TAG, "onNext: " + integer);

            }
            @Override
            public void onError(Throwable t) {
                Log.w(TAG, "onError: ", t);
            }
            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete");
            }
        };
        flowable.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(subscriber);

01-20 16:18:58.898 17829-17851/? D/MainActivity: emit 1
01-20 16:18:58.898 17829-17851/? D/MainActivity: emit 2
01-20 16:18:58.898 17829-17851/? D/MainActivity: emit 3
01-20 16:18:58.898 17829-17851/? D/MainActivity: emit complete
01-20 16:18:58.908 17829-17829/? D/MainActivity: onNext: 1
01-20 16:18:58.908 17829-17829/? D/MainActivity: onNext: 2
01-20 16:18:58.908 17829-17829/? D/MainActivity: onNext: 3
01-20 16:18:58.908 17829-17829/? D/MainActivity: onComplete

上述代码创建了一个Flowable(被观察者)和一个Subscriber(观察者),可以看到程序如我们预期的一样输出结果了。不同的是 onSubscribe(Subscription s)中传给我们的不再是Disposable了, 而是Subscription。然而Subscription也可以用于切断观察者与被观察者之间的联系,调用Subscription.cancel()方法便可。 不同的地方在于Subscription增加了一个void request(long n)方法, 这个方法有什么用呢, 在上面的代码中也有这么一句代码:

  s.request(Long.MAX_VALUE);  

这个方法就是用来向生产者申请可以消费的事件数量。这样我们便可以根据本身的消费能力进行消费事件。
当调用了request()方法后,生产者便发送对应数量的事件供消费者消费。
这是因为Flowable在设计的时候采用了一种新的思路也就是响应式拉取的方式,你要求多少,我便传给你多少。

注意:如果不显示调用request就表示消费能力为0。

虽然并不限制向request()方法中传入任意数字,但是如果消费者并没有这么多的消费能力,依旧会造成资源浪费,最后产生OOM。形象点就是不能打肿脸充胖子。
而ERROR策略就避免了这种情况的出现(讲了这么多终于出现了)。

在异步调用时,RxJava中有个缓存池,用来缓存消费者处理不了暂时缓存下来的数据,缓存池的默认大小为128,即只能缓存128个事件。无论request()中传入的数字比128大或小,缓存池中在刚开始都会存入128个事件。当然如果本身并没有这么多事件需要发送,则不会存128个事件。
在ERROR策略下,如果缓存池溢出,就会立刻抛出MissingBackpressureException异常。

Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                for (int i = 0; i < 129; i++) {
                    Log.d(TAG, "emit " + i);
                    emitter.onNext(i);
                }
            }
        }, BackpressureStrategy.ERROR).subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<Integer>() {
                    @Override
                    public void onSubscribe(Subscription s) {
                        mSubscription = s;
                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "onNext: " + integer);
                    }
                    @Override
                    public void onError(Throwable t) {
                        Log.w(TAG, "onError: ", t);
                    }

                    @Override
                    public void onComplete() {
                    }
                });

01-20 16:58:48.993 32434-32474/? D/MainActivity: emit 125
01-20 16:58:48.993 32434-32474/? D/MainActivity: emit 126
01-20 16:58:48.993 32434-32474/? D/MainActivity: emit 127
01-20 16:58:48.993 32434-32474/? D/MainActivity: emit 128
01-20 16:58:49.003 32434-32434/? W/MainActivity: onError:
io.reactivex.exceptions.MissingBackpressureException: create: could not emit value due to lack of requests

我们让Flowable发送129个事件,而Subscriber一个也不处理,就产生了异常。
因此,ERROR即保证在异步操作中,事件累积不能超过128,超过即出现异常。消费者不能再接收事件了,但生产者并不会停止。

BUFFER

所谓BUFFER就是把RxJava中默认的只能存128个事件的缓存池换成一个大的缓存池,支持存很多很多的数据。
这样,消费者通过request()即使传入一个很大的数字,生产者也会生产事件,并将处理不了的事件缓存。
但是这种方式任然比较消耗内存,除非是我们比较了解消费者的消费能力,能够把握具体情况,不会产生OOM。
总之BUFFER要慎用。

DROP

看名字就可以了解其作用:当消费者处理不了事件,就丢弃。
消费者通过request()传入其需求n,然后生产者把n个事件传递给消费者供其消费。其他消费不掉的事件就丢掉。
下面例子具体介绍:



点击“开始”按钮,建立连接。生产者开始生产事件,刚开始消费者通过request()只要了50个事件消费。然后每次点击“消费”按钮,再次消费50个事件。

mFlowable = Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                for (int i = 0; ; i++) {

                    emitter.onNext(i);
                }
            }
        }, BackpressureStrategy.DROP);
        mSubscriber = new Subscriber<Integer>() {
            @Override
            public void onSubscribe(Subscription s) {
                mSubscription = s;
                s.request(50);
            }

            @Override
            public void onNext(Integer integer) {
                Log.d(TAG, "onNext: " + integer);
            }

            @Override
            public void onError(Throwable t) {
                Log.w(TAG, "onError: ", t);
            }

            @Override
            public void onComplete() {

            }
        };
    }
    public void start(View view){
        mFlowable.subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(mSubscriber);

    }
    public void consume(View view){
        mSubscription.request(50);

    }

01-20 17:25:44.331 3327-3327/com.lvr.rxjavalearning D/MainActivity: onNext: 0
..........................................................
01-20 17:25:44.331 3327-3327/com.lvr.rxjavalearning D/MainActivity: onNext: 49
01-20 17:25:47.891 3327-3327/com.lvr.rxjavalearning D/MainActivity: onNext: 50
..........................................................
01-20 17:25:47.891 3327-3327/com.lvr.rxjavalearning D/MainActivity: onNext: 99
01-20 17:25:50.241 3327-3327/com.lvr.rxjavalearning D/MainActivity: onNext: 100
..........................................................
01-20 17:25:50.241 3327-3327/com.lvr.rxjavalearning D/MainActivity: onNext: 127
01-20 17:25:50.241 3327-3327/com.lvr.rxjavalearning D/MainActivity: onNext: 17749078
..........................................................
01-20 17:25:50.241 3327-3327/com.lvr.rxjavalearning D/MainActivity: onNext: 17749099

可以看出,生产者一次性传入128个事件进入缓存池。点击“开始”按钮,消费了50个。然后第一次点击“消费”按钮,又消费了50个,第二次点击“消费”按钮,再次消费50个。然而此时原来的128个缓存只剩下28个了,所以先消费掉28个,然后剩下22个是后来传入的(其实后来的是在消费了96个后传入,并一次性在缓存池中又传入了96个,具体可以看源码,这里不解释了)。

LATEST

LATEST与DROP功能基本一致。
消费者通过request()传入其需求n,然后生产者把n个事件传递给消费者供其消费。其他消费不掉的事件就丢掉。
唯一的区别就是LATEST总能使消费者能够接收到生产者产生的最后一个事件。
还是以上述例子展示,唯一的区别就是Flowable不再无限发事件,只发送1000000个。
结果如下:

01-20 17:50:30.459 25334-25334/com.lvr.rxjavalearning D/MainActivity: onNext: 0
..........................................................
01-20 17:50:30.459 25334-25334/com.lvr.rxjavalearning D/MainActivity: onNext: 49
01-20 17:50:31.569 25334-25334/com.lvr.rxjavalearning D/MainActivity: onNext: 50
..........................................................
01-20 17:50:32.459 25334-25334/com.lvr.rxjavalearning D/MainActivity: onNext: 100
01-20 17:50:32.459 25334-25334/com.lvr.rxjavalearning D/MainActivity: onNext: 101
..........................................................
01-20 17:50:32.459 25334-25334/com.lvr.rxjavalearning D/MainActivity: onNext: 127
01-20 17:50:32.459 25334-25334/com.lvr.rxjavalearning D/MainActivity: onNext: 999999

唯一的区别就在最后一行。这就是LATEST与DROP的区别。

上述例子Flowable对象的获取都是通过create()获取的,自然可以通过BackpressureStrategy.LATEST之类的方式指定处理背压的策略。如果Flowable对象不是自己创建的,可以采用onBackpressureBuffer()、onBackpressureDrop()、onBackpressureLatest()的方式指定。

 Flowable.just(1).onBackpressureBuffer()
                .observeOn(AndroidSchedulers.mainThread())
                .subscribeOn(Schedulers.io())
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        
                    }
                });

以上就是关于Flowable及backpressure的内容。由于水平有限,关于RxJava 2.0 的学习内容就暂时只有这么多了。接下来会继续学习,如果有新的理解和认识,会再来分享。

手把手教你使用 RxJava 2.0(一)
手把手教你使用 RxJava 2.0(二)

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,816评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,729评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,300评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,780评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,890评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,084评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,151评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,912评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,355评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,666评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,809评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,504评论 4 334
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,150评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,882评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,121评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,628评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,724评论 2 351

推荐阅读更多精彩内容