RxJava2--Flowable与BackPress

转载自:Rxjava2入门教程五:Flowable背压支持——对Flowable最全面而详细的讲解

背压介绍

当上下游在不同的线程中,通过Observable发射,处理,响应数据流时,如果上游发射数据的速度快于下游接收处理数据的速度,这样对于那些没来得及处理的数据就会造成积压,这些数据既不会丢失,也不会被垃圾回收机制回收,而是存放在一个异步缓存池中,如果缓存池中的数据一直得不到处理,越积越多,最后就会造成内存溢出,这便是响应式编程中的背压(backpressure)问题。

如果上下游处在同一个线程中,则不会出现背压的问题。因为下游处理完时间后,上游才会发射。

Flowable

大量数据处理需要用Flowable,而小数据则使用Observable即可

由于基于Flowable发射的数据流,以及对数据加工处理的各操作符都添加了背压支持,附加了额外的逻辑,其运行效率要比Observable慢得多。

由于只有在上下游运行在不同的线程中,且上游发射数据的速度大于下游接收处理数据的速度时,才会产生背压问题。

所以,如果能够确定:

  1. 上下游运行在同一个线程中,
  2. 上下游工作在不同的线程中,但是下游处理数据的速度不慢于上游发射数据的速度,
  3. 上下游工作在不同的线程中,但是数据流中只有一条数据

则不会产生背压问题,就没有必要使用Flowable,以免影响性能。

Flowable的使用

下例使用了Flowable来发射事件,大体与Observable类似,只是有几点区别:

  1. Flowable发射数据时,使用特有的发射器FlowableEmitter,不同于Observable的ObservableEmitter
  2. create方法中多了一个BackpressureStrategy类型的参数,该参数负责当BackPress产生的时候,对应的Emitter的处理策略是什么样的
  3. onSubscribe中接收的不是Dispose,而是Subscription对象,并且调用了s?.request(10)
 Flowable.create<Int>({ emitter ->
            emitter.onNext(1)
            emitter.onComplete()
        }, BackpressureStrategy.ERROR)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(object : Subscriber<Int> {
                   override fun onSubscribe(s: Subscription?) { s?.request(10)}

                    override fun onNext(t: Int?) = System.out.println("onNext...$t")

                    override fun onComplete() = System.out.println("onComplete")

                    override fun onError(t: Throwable?) = System.out.println("onError...$t")
                })

BackpressureStrategy背压策略

BackPress策略有这几种:

  • MISSING
  • ERROR
  • BUFFER
  • DROP
  • LATEST

当上游发送数据的速度快于下游接收数据的速度,且运行在不同的线程中时,Flowable通过自身特有的异步缓存池,来缓存没来得及处理的数据,缓存池的容量上限为128条。

当缓存池的容量超过128条时,就会触发Backpress的应对策略。

BackpressureStrategy的作用便是用来设置Flowable通过异步缓存池缓存数据的策略。在FlowableCreate类中看到,在设置完BackpressureStrategy之后,对应的Strategy会根据映射生成不同Emitter:

  • MISSING ----> MissingEmitter
    在此策略下,通过Create方法创建的Flowable相当于没有指定背压策略,不会对通过onNext发射的数据做缓存或丢弃处理,需要下游通过背压操作符(onBackpressureBuffer()/onBackpressureDrop()/onBackpressureLatest())指定背压策略。
  • ERROR ----> ErrorAsyncEmitter
    在此策略下,如果放入Flowable的异步缓存池中的数据超限了,则会抛出MissingBackpressureException异常。
  • DROP ----> DropAsyncEmitter
    如果Flowable的异步缓存池满了,会丢掉上游发送的数据。由于Emitter都是继承自AutomicLong或者其他的原子数据,所以通过get()得到的就是缓存池数据剩下的数量,如果为0,代表缓存池已经满了。
  • LATEST ----> LatestAsyncEmitter
    与Drop策略一样,如果缓存池满了,会丢掉将要放入缓存池中的数据,不同的是,不管缓存池的状态如何,LATEST都会将最后一条数据强行放入缓存池中,来保证观察者在接收到完成通知之前,能够接收到Flowable最新发射的一条数据
  • BUFFER ----> BufferAsyncEmitter
    默认的策略。如果Flowable默认的异步缓存池满了,会通过该Emitter中新增的缓存池暂存数据,它与Observable的异步缓存池一样,可以无限制向里添加数据,不会抛出MissingBackpressureException异常,但会导致OOM

背压操作符

Backpress操作符一共有这些:

  • onBackpressureBuffer
  • onBackpressureDrop
  • onBackpressureLatest

主要的作用就是,当Flowable不是通过create创建时,没有传入BackpressStrategy,则可以通过这些操作符来指定BackpressStrategy。例如:

Flowable.range(0, 500).onBackpressureDrop()

Flowable的响应式拉取

Flowable在设计的时候,采用了一种新的思路——响应式拉取方式,来设置下游对数据的请求数量,上游可以根据下游的需求量,按需发送数据。

如果不显示调用request则默认下游的需求量为零,上游Flowable发射的数据不会交给下游Subscriber处理。而多次调用则会将该数累加:

Flowable.create<Int>({ emitter ->
            repeat(3) {
                Log.e(TAG, "emitter.request:${emitter.requested()}")
                emitter.onNext(it)
            }
            emitter.onComplete()
        }, BackpressureStrategy.ERROR).onBackpressureBuffer()
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(object : Subscriber<Int> {
                    override fun onSubscribe(s: Subscription?) {
                        //  累加到2 
                        s?.request(1)
                        s?.request(1)
                    }

                    override fun onNext(t: Int?) = System.out.println("onNext...$t")

                    override fun onComplete() = System.out.println("onComplete")

                    override fun onError(t: Throwable?) = System.out.println("onError...$t")
                })

就会输出:

onNext...0
onNext...1
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,711评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,079评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,194评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,089评论 1 286
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,197评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,306评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,338评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,119评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,541评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,846评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,014评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,694评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,322评论 3 318
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,026评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,257评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,863评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,895评论 2 351

推荐阅读更多精彩内容