RxKotlin 例子不超过15行教程 7----Backpressure Flowable 与 Subscriber 简介

依照惯例, 先放两个能跑的代码
!!本节中所有 Item 均为这段代码!!

// item.kt
data class Item(val id: Int) {
    init {  // 锚点1
        println("Item Created $id")
    }
}

能跑的代码

// 7.1.kt
import io.reactivex.Observable

fun main(args: Array<String>) {
    Observable.just(1, 2, 3).map { Item(it) }  // 注释1
            .subscribe({
                println("Received $it")
                Thread.sleep(100)
            })
    Thread.sleep(1000)
}

输出

Item Created 1
Received Item(id=1)
Item Created 2
Received Item(id=2)
Item Created 3
Received Item(id=3)

注释1
这里 Map 的意义就是利用对象初始化(锚点1)时能打印出来一个值这个特性, 便于我们看 一个值被弹出的时间点 (用来和 这个值被处理的时间点 作比较), 把数字转不转化成对象在这里没有特殊含义(是副产品)

上面代码主旨大意

从输出中可以看到基本流程是:
一个值被 Observable 弹出 -> 被 Observer 处理 -> 下一个值被弹出 -> ...
这是因为 ObservableObserver 运行在一个线程中, 所以在 Observer 没处理完上一个值之前 Observable 是不能弹出下一个值的。那如果 ObservableObserver 运行在不同的线程中呢?来看下一个例子

// 7.2.kt
import io.reactivex.Observable
import io.reactivex.schedulers.Schedulers

fun main(args: Array<String>) {
    Observable.just(1, 2, 3).map { Item(it) }
            .observeOn(Schedulers.newThread())  // 注释1  Scheduler 会在之后的章节说明
            .subscribe({
                Thread.sleep(100)
                println("Received $it")
            })
    Thread.sleep(1000)
}

输出

Item Created 1
Item Created 2
Item Created 3
Received Item(id=1)
Received Item(id=2)
Received Item(id=3)

注释1
这里只需要知道这一行代码使得 Observer 在另一个线程中运行即可

上面代码主旨大意

从输出中可以看到 Observable 完全不会顾及 Observer 的感受, 只会一个值接一个值的弹。如果此时 Observer 执行某些耗时计算(即 值被 Observable 弹出的速度远远快于 被 Observer 处理的速度)(这里耗时计算我们用 sleep 模拟), 那么这些值在未被处理之前都会累积在内存中,如果数据量很大甚至可以导致内存溢出(OutOfMemory)。

如果数据的 消费者(在这里是 Observer) 可以通过某种渠道告知数据的 生产者(在这里是 Observable) "先别弹出值了, 等我把累积的值处理完你再接着弹" 这一信息呢?

这个反馈渠道是存在的, 在 ReactiveX 中, 我们叫它 Backpressure (背压)

Backpressure

背压 , 指的是后端的压力, 通常用于描述系统排出的流体在出口处或二次侧受到的与流动方向相反的压力。----百度百科

ObservableObserver 是不支持 Backpressure 的。替代方案是 FlowableSubscriber

Backpressure 生产者 消费者
不支持 Observable Observer
支持 Flowable Subscriber

FlowableReactiveX 2.x (RxKotlin 2.X) 中被加入, 之前的版本不包括它。

Flowable

// 7.3.kt
import io.reactivex.Flowable  // 和 7.2.kt 相比, 这一行有改变
import io.reactivex.schedulers.Schedulers

fun main(args: Array<String>) {
    Flowable.just(1, 2, 3).map { Item(it) }  // 和 7.2.kt 相比, 这一行有改变
            .observeOn(Schedulers.newThread())
            .subscribe({  // 注释1
                Thread.sleep(100)
                println("Received $it")
            })
    Thread.sleep(1000)
}

输出

Item Created 1
Item Created 2
Item Created 3
Received Item(id=1)
Received Item(id=2)
Received Item(id=3)

注释1
这里是 Subscriber 而不是 Observer, 但是由于用的 Lambda 形式, 看起来一样。我们会在之后讨论。
上面的输出和 7.2.kt 的输出完全一致, 我一会说为什么没有效果
看下一个例子

// 7.4.kt
import io.reactivex.Flowable
import io.reactivex.schedulers.Schedulers

fun main(args: Array<String>) {
    Flowable.range(1, 260).map { Item(it) }  // 和 7.3.kt 相比, 只有这一行有改变
            .observeOn(Schedulers.newThread())
            .subscribe({
                Thread.sleep(100)
                println("Received $it")
            })
    Thread.sleep(27000)
}

输出(原谅我吧, 数据量太小根本没有效果)

Item Created 1
Item Created 2
...
Item Created 127
Item Created 128  // 当 Flowable 弹出 128 个值就暂时停止了, 缓冲区达到上限(128)
Received Item(id=1)
Received Item(id=2)
...
Received Item(id=95)
Received Item(id=96)  // Subscriber 仅仅处理了 96 个值, 缓冲区没有被清空
Item Created 129
Item Created 130
...
Item Created 223
Item Created 224  // 缓冲区再次达到上限 224-96=128
Received Item(id=97)
Received Item(id=98)
...
Received Item(id=191)
Received Item(id=192)
Item Created 225
Item Created 226
...
Item Created 259
Item Created 260  // 这次是因为没有值了
Received Item(id=193)
Received Item(id=194)
...
Received Item(id=259)
Received Item(id=260)

Flowable 不会一下子把所有值全部弹出, 它会一块一块的弹, 当 Subscriber 跟上时才会继续
Flowable 会维护一个默认大小为 128 个元素的缓冲区, 被弹出的元素会暂存其中。如果满了 Flowable 就会暂时停止弹射。

7.3.kt 数据量还没有达到缓冲区上限, 所以看起来没有效果。
7.3.kt 这个例子, 还有其他原因, 之后再说。

Subscriber

Subscriber 其实可以 动态 限定从上游拿到的值的数量(动态 这个词接下来就会解释,不要急), 但是我们之前一直用的 Lambda 形式, 系统默认从上游获取所有值。
我们来看一看如何限定这个数量。
先来看一个 Subscriber(有点长,并且本节其它的 Subscriber 都这么长, 非常抱歉)

// subscriber_1.kt
import org.reactivestreams.Subscriber
import org.reactivestreams.Subscription

val subscriber_1 = object : Subscriber<Item> {
    override fun onSubscribe(subscription: Subscription) {
        subscription.request(4)  // 注释1  限定请求 4 个值
        println("New Subscription ")
    }

    override fun onNext(s: Item) {
        Thread.sleep(200)
        println("Subscriber received " + s)
    }

    override fun onError(e: Throwable) {
        e.printStackTrace()
    }

    override fun onComplete() {
        println("Done!")
    }
}

可以通过下面来调用上面的 subscriber_1

// 7.5.kt
import io.reactivex.Flowable
import io.reactivex.schedulers.Schedulers

fun main(args: Array<String>) {
    Flowable.range(1, 6)
            .map { Item(it) }
            .observeOn(Schedulers.newThread())
            .subscribe(subscriber_1)  // 注释2
    Thread.sleep(2000)
}

输出

New Subscription
Item Created 1
Item Created 2
Item Created 3
Item Created 4
Item Created 5
Item Created 6
Subscriber received Item(id=1)
Subscriber received Item(id=2)
Subscriber received Item(id=3)
Subscriber received Item(id=4)  // 注释3

注释1
如果删掉这一行, 我们就没有限定请求数量, 一个值都接收不到(我原本以为会接收到所有值的, 做了实验才发现和我想的不太一样)
注释2
我没有在接下来都使用一个 Subscriber 来避免重复代码
但是我会对代码进行处理,把和上一个不一样的地方用注释标识出来
因为 Subscriber 的重点在通过不同的配置来控制流速和总量。
为了灵活, 我选择创建多个 Subscriber
注释3
我们只请求 4 个值, 数据流并没有到结尾, 系统没有调用 onComplete 方法。所以没有输出 Done!
我再举一个例子, 这个例子只是想说明何时 onComplete 会被调用

// 7.6.kt
import io.reactivex.Flowable
import io.reactivex.schedulers.Schedulers

fun main(args: Array<String>) {
    Flowable.range(1, 4)  // 只有这一行与 7.5.kt 相比有变动
            .map { Item(it) }
            .observeOn(Schedulers.newThread())
            .subscribe(subscriber_1)
    Thread.sleep(2000)
}

输出

New Subscription
Item Created 1
Item Created 2
Item Created 3
Item Created 4
Subscriber received Item(id=1)
Subscriber received Item(id=2)
Subscriber received Item(id=3)
Subscriber received Item(id=4)
Done!  // onComplete 被调用了

当且仅当数据流真正到达末尾时才会触发 onComplete。限定数量是不会的

之前我说 Subscriber 可以 动态 限定从上游拿到的值的数量, 这个动态可以从下面体现

// subscriber_2.kt
import org.reactivestreams.Subscriber
import org.reactivestreams.Subscription

val subscriber_2 = object : Subscriber<Item> {
    lateinit var subscription: Subscription  // 与 subscriber_1 相比多了这一行
    override fun onSubscribe(subscription: Subscription) {
        this.subscription = subscription  // 与 subscriber_1 相比多了这一行
        subscription.request(4)
        println("New Subscription ")
    }

    override fun onNext(s: Item) {
        Thread.sleep(200)
        println("Subscriber received " + s)
        if (s.id == 4) {                    // |\
            println("Requesting two more")  // | \
            subscription.request(2)         // | /--- 与 subscriber_1 相比多了这几行
        }                                   // |/
    }

    override fun onError(e: Throwable) {
        e.printStackTrace()
    }

    override fun onComplete() {
        println("Done!")
    }
}
// 7.7.kt
import io.reactivex.Flowable
import io.reactivex.schedulers.Schedulers

fun main(args: Array<String>) {
    Flowable.range(1, 6)
            .map { Item(it) }
            .observeOn(Schedulers.newThread())
            .subscribe(subscriber_2) // 只有这一行与 7.5.kt 相比有变动
    Thread.sleep(2000)
}

输出

New Subscription
Item Created 1
Item Created 2
Item Created 3
Item Created 4
Item Created 5
Item Created 6
Subscriber received Item(id=1)
Subscriber received Item(id=2)
Subscriber received Item(id=3)
Subscriber received Item(id=4)
Requesting two more  // 决定再从上游拿 2 个值进行处理
Subscriber received Item(id=5)
Subscriber received Item(id=6)
Done!

OK 这一节就到这里。下一节我们说一说 Flowable 把速度降下来的几种方式~

RxKotlin 例子不超过15行教程 1----环境配置与初体验

RxKotlin 例子不超过15行教程 2----Observable Observer 与 Subscribe 简介

RxKotlin 例子不超过15行教程 3----Observable 的创建

RxKotlin 例子不超过15行教程 4----Observer Subscribe 与 Hot/Cold Observable

RxKotlin 例子不超过15行教程 5----Subject

RxKotlin 例子不超过15行教程 6----Operator 与 Marble Diagram

RxKotlin 例子不超过15行教程 7----Backpressure Flowable 与 Subscriber 简介

RxKotlin 例子不超过15行教程 8----Error Handling

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,384评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,845评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,148评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,640评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,731评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,712评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,703评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,473评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,915评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,227评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,384评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,063评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,706评论 3 324
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,302评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,531评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,321评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,248评论 2 352

推荐阅读更多精彩内容