【Android】Rxjava2 Flowable详解与背压那些事

1.Rxjava1中的背压

Rxjava2中有这么一个被观察者Flowable,同样作为被观察者,它和Observable有什么区别呢,在Rxjava2中,Observable不再支持背压,而新增的Flowable支持背压,何为背压,就是上游发送事件的速度大于下游处理事件的速度所产生的现象。

我们来看个例子,先把rxjava切换到rxjava1.0:

implementation 'io.reactivex:rxjava:1.1.6'
implementation 'io.reactivex:rxandroid:1.2.1'

然后执行如下代码:

        //被观察者在主线程中,每1ms发送一个事件
        Observable.interval(1, TimeUnit.MILLISECONDS)
                //观察者每1s才处理一个事件
                .subscribe(new Action1<Long>() {
                    @Override
                    public void call(Long aLong) {
                        try {
                            Thread.sleep(1000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        Log.w("tag", "---->" + aLong);
                    }
                });

执行结果如下:


image.png

我特?说好的背压呢,说好的异常呢,不要慌,因为上面的代码是同步的情况,都是运行在祝线程的,所以同步的情况下,被观察者每发送一个事件,观察者就会处理一个事件,等观察者处理完当前事件后,被观察者才会继续发送事件,两者分工明确,恩爱和睦,不存在发送速度不一致的情况。

下面我们来看下异步的情况:

        //被观察者在主线程中,每1ms发送一个事件
        Observable.interval(1, TimeUnit.MILLISECONDS)
                .observeOn(Schedulers.newThread())
                //观察者在子线程中每1s处理一个事件
                .subscribe(new Action1<Long>() {
                    @Override
                    public void call(Long aLong) {
                        try {
                            Thread.sleep(1000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        Log.w("tag", "---->" + aLong);
                    }
                });

运行后就会出现如下异常:


image.png

出现了背压的情况,抛出了MissingBackpressureException异常,异步情况下被观察者发送事件是比较暴力的,一次性全部发完,放在缓存池,然后观察者一条条慢慢去处理,发送过快就会出现背压的情况.

背压产生的条件:必须是异步的场景下才会出现,即被观察者和观察者处于不同的线程中。

rxjava1中默认的缓存池大小是16,当事件超过就会出现MissingBackpressureException,看如下例子:

    Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                for (int i = 0; i < 17; i++) {
                    Log.w("tag", "send ----> i = " + i);
                    subscriber.onNext("i = "+i);
                }
            }
        })
                .subscribeOn(Schedulers.newThread())
                //将观察者的工作放在新线程环境中
                .observeOn(Schedulers.newThread())
                //观察者处理每1000ms才处理一个事件
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String value) {
                        try {
                            Thread.sleep(1000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        Log.w("tag", "---->" + value);
                    }
                });

你看:


image.png

嗯,默认的缓存池为什么是16,这个问题问的好,因为人家rxjava给的默认值就是16啊,不信你看:

    public final <B> Observable<List<T>> buffer(Observable<B> boundary) {
        return buffer(boundary, 16);
    }

rxjava1中也提供了处理背压的操作符onBackpressureBuffer和onBackpressureDrop,下面我们来简单看下onBackpressureBuffer:

        //被观察者在主线程中,每1ms发送一个事件
        Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                for (int i = 0; i < 10000; i++) {
                    Log.w("tag", "send ----> i = " + i);
                    subscriber.onNext("i = "+i);
                }
            }
        })
                .onBackpressureBuffer()
                .subscribeOn(Schedulers.newThread())
                .observeOn(Schedulers.newThread())
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String value) {
                        try {
                            Thread.sleep(1000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        Log.w("tag", "---->" + value);
                    }
                });

运行结果如下:


image.png

其实onBackpressureBuffer也就是增加了缓存池的大小,这个值为Long.MAX_VALUE,当然我们也可以自己指定onBackpressureBuffer(size)的大小:

    Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                for (int i = 0; i < 100; i++) {
                    Log.w("tag", "send ----> i = " + i);
                    subscriber.onNext("i = "+i);
                }
            }
        })
                .onBackpressureBuffer(100)
                .subscribeOn(Schedulers.newThread())
                .observeOn(Schedulers.newThread())
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String value) {
                        try {
                            Thread.sleep(1000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        Log.w("tag", "---->" + value);
                    }
                });
image.png

onBackpressureDrop的作用是当观察者来不及处理事件的时候,会把事件给丢弃掉,而onBackpressureLatest操作符表示当被观察者Observable发出事件的速度比观察者消耗得要快,观察者会接收Observable最新发出的事件进行处理,这两种情况大家可以自行测试感受下。

从上面的例子可以看出,在rxjava1中,interval操作符默认是不支持背压的,我们来试试range操作符:

    Observable.range(1,10000)
                .subscribeOn(Schedulers.newThread())
                .observeOn(Schedulers.newThread())
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer value) {
                        try {
                            Thread.sleep(1000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        Log.w("tag", "---->" + value);
                    }
                });

运行结果如下:


image.png

尼玛,竟然没有出现背压,纳尼?


image.png

表情包好像放错了,走错片场了,哈哈哈,难道range操作符有毛病,不应该啊,最后经过一番查找,发现问题在observeOn操作符上,observeOn这个操作符内部有一个缓冲区,Android环境下长度是16,它会告诉range最多发送16个事件,充满缓冲区即可。

这样可以看出,之前使用的interval操作符是不支持背压的,而range则支持背压,那么到底什么样的Observable支持背压或不支持背压呢?


image.png

其实在rxjava1中,不是所有Observable都支持背压,从上面的例子也可以看出来这一点,我们知道Observable有hot和cold之分,rxjava1中hot observable是不支持背压的,而cold observable中也有一部分不支持背压,这里不再深究,想继续了解可以自行google,另外一个原因是现在都tm Rxjava2了,我还在这扯rxjava1,罪过罪过,我也是为了引出问题。

简单扯一下解决背压的思路,无非是限制发送的速度,俗称限流,很多操作符都可以做到这些,比如sample在一段时间内只处理最后一个数据等,也可以使用rxjava1中提供的onBackpressureBuffer,onBackpressureDrop,onBackpressureLatest。

虽然rxjava1也有处理背压的方法,但设计并不完美,缓存池大小只有16,而且被观察者无法得知下游观察者对事件的处理速度,一次性把事件抛给了下游观察者,所以rxjava2中对背压进行了改进。

2.Rxjava2中的背压

Rxjava2中新增了一个被观察者Flowable用来专门支持背压,默认队列大小128,并且其所有的操作符都强制支持背压,先看个简单的例子:

        Flowable.create(new FlowableOnSubscribe<String>() {
            @Override
            public void subscribe(FlowableEmitter<String> emitter) throws Exception {
                for (int i = 0;i < 1000000; i++) {
                    emitter.onNext("i = "+i);
                }
            }
        }, BackpressureStrategy.ERROR)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        Log.e("tag","----> "+s);
                    }
                });

运行结果如下:


image.png

说好的支持背压呢,怎么这个熟悉的异常又出现了????


image.png

细心的同学肯定发现了,Flowable.create方法第二个参数BackpressureStrategy.ERROR,这个BackpressureStrategy类其实就是处理背压的策略类,看下这个类的源码:
public enum BackpressureStrategy {
    //不指定背压策略
    MISSING,
    //出现背压就抛出异常
    ERROR,
    //指定无限大小的缓存池,此时不会出现异常,但无限制大量发送会发生OOM
    BUFFER,
    //如果缓存池满了就丢弃掉之后发出的事件
    DROP,
    //在DROP的基础上,强制将最后一条数据加入到缓存池中
    LATEST
}

依次来看下这几种策略的区别吧!

MISSING

        Flowable.create(new FlowableOnSubscribe<String>() {
            @Override
            public void subscribe(FlowableEmitter<String> emitter) throws Exception {
                for (int i = 0;i < 1000000; i++) {
                    emitter.onNext("i = "+i);
                }
            }
        }, BackpressureStrategy.MISSING)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        Log.e("tag","----> "+s);
                    }
                });

不出所料,果然抛出了异常:


MISSING

ERROR

BackpressureStrategy.ERROR上面已经测试过了,不再重复了,依然会报异常。

BUFFER

      Flowable.create(new FlowableOnSubscribe<String>() {
            @Override
            public void subscribe(FlowableEmitter<String> emitter) throws Exception {
                for (int i = 0;i < 1000000; i++) {
                    emitter.onNext("i = "+i);
                }
            }
        }, BackpressureStrategy.BUFFER)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        Log.e("tag","----> "+s);
                    }
                });

运行结果如下,确实不会出现背压异常了,但是内存占用嗖嗖的升高,数据量足够大足够快的时候,OOM指日可待,哈哈哈!!!


BUFFER

DROP

        Flowable.create(new FlowableOnSubscribe<String>() {
            @Override
            public void subscribe(FlowableEmitter<String> emitter) throws Exception {
                for (int i = 0;i < 1000000; i++) {
                    emitter.onNext("i = "+i);
                }
            }
        }, BackpressureStrategy.DROP)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        Log.e("tag","----> "+s);
                    }
                });

运行结果如下:


DROP

可以发现,在填充满了默认的128个大小的缓存池后,丢弃了很多数据,DROP就是干这事的,发不下就不放了,有点狠啊!!

LATEST

        Flowable.create(new FlowableOnSubscribe<String>() {
            @Override
            public void subscribe(FlowableEmitter<String> emitter) throws Exception {
                for (int i = 0;i < 1000; i++) {
                    emitter.onNext("i = "+i);
                }
            }
        }, BackpressureStrategy.LATEST)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        Log.e("tag","----> "+s);
                    }
                });

这次我们只发送1000个事件,运行结果如下:


LATEST

LATEST策略下,当缓存池满了之后也是会丢弃事件的,不仅如此,它还会把事件的最后一个强制放入到缓存池中,所以可以看到999被观察者收到了。

上面我们都是用的Flowable的create创建的被观察者,如果我们使用just,fromArray等操作符该如何指定背压策略呢?其实也很简单,因为rxjava2像rxjava1那样也提供了onBackpressureBuffer(),onBackpressureDrop(),onBackpressureLatest(),这样用就可以了:

        Flowable.range(1,1000)
                .onBackpressureBuffer(1000)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer s) throws Exception {
                        Log.e("tag","----> "+s);
                    }
                });

嗯,运行结果很稳:


onBackpressureBuffer

那么可能我们会有个疑问,上面的例子都是观察者被动的接收事件,能不能主动拉取事件呢,当然可以,我们看下下面这个例子:

        Flowable.create(new FlowableOnSubscribe<String>() {
            @Override
            public void subscribe(FlowableEmitter<String> emitter) throws Exception {
                for (int i = 0; i < 1000; i++) {
                    emitter.onNext("i = " + i);
                }
            }
        }, BackpressureStrategy.BUFFER)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<String>() {
                    @Override
                    public void onSubscribe(Subscription s) {
                        subscription = s;
                    }

                    @Override
                    public void onNext(String s) {
                        Log.e("tag", "----> " + s);
                    }

                    @Override
                    public void onError(Throwable t) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });

看下运行结果:

image.png

此时我们的观察者使用了Subscriber,它有一个onSubscribe方法,参数为Subscription,其实关键点就是这个Subscription,它的作用就是从缓存池拉取事件,它有一个request(count)方法,它的作用就是拉取事件,并可以指定拉取事件的个数。我们在上面的例子中,使用subscription.request(5)每次拉取5个事件,其实也是很简单的。

其实搞了半天,文章基本也要结束了,虽然rxjava提供了处理背压的策略,但是最好还是能尽量避免上游被观察者发送事件过快过多,实在需要处理,就结合各种策略和操作符进行按需处理。

3.项目中的使用

上周在项目中遇到了这么一个场景,就是在跳转页面之前需要释放camera,这是个耗时操作,返回当前页面的时候需要重新open Camera,而且open Camera的时机需要在SurfaceView的create中执行,这个场景刚好用request可以解决,例子和上面类似,就不再上代码了。


大家拜拜

关于Flowable的使用就先到这吧,由于个人水平有限,难免会犯些错误,有问题欢迎留言讨论。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 219,539评论 6 508
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,594评论 3 396
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 165,871评论 0 356
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,963评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,984评论 6 393
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,763评论 1 307
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,468评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,357评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,850评论 1 317
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,002评论 3 338
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,144评论 1 351
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,823评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,483评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,026评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,150评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,415评论 3 373
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,092评论 2 355

推荐阅读更多精彩内容