ShareFlow

一、通过 SharedFlow 实现广播

通过 SharedFlow 可以实现类似广播的功能:

runBlocking {
    val data = MutableSharedFlow<Int>()
    val job1 = launch {
        data.collect {
            println("first : received $it")
        }
    }
    val job2 = launch {
        data.collect {
            println("second: received $it")
        }
    }
    delay(1000)
    data.emit(1)
    delay(1000)
    job1.cancel()
    job2.cancel()
}

运行程序,输出如下:
first : received 1
second: received 1

需要注意的是,SharedFlow 默认无法收到 collect 前发射的值,在本例中,如果把 data.emit(1) 前的 delay(1000) 去掉,则有概率接收不到发射的 1,这是因为协程的启动需要时间,如果 在协程启动之前,emit(1) 就被调用了,那么协程启动后,就会收不到之前发射的值。

二、通过 replay 指定保留多少粘性数据

SharedFlow 的构造器中,有一个 replay 参数,它就是用来实现粘性数据的。它的意义是接收方 collect() 时,将多少个之前缓存的数据发送给接收方。

public fun <T> MutableSharedFlow(
    replay: Int = 0,
    extraBufferCapacity: Int = 0,
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
): MutableSharedFlow<T> {
    ...
}

SharedFlow 的 replay 默认是 0,也就是默认无粘性。
接下来我们将 replay 设置为 2,测试一下:

runBlocking {
    val data = MutableSharedFlow<Int>(2)
    data.emit(1)
    data.emit(2)
    val job1 = launch {
        data.collect {
            println("first : received $it")
        }
    }
    val job2 = launch {
        data.collect {
            println("second: received $it")
        }
    }
    delay(1000)
    job1.cancel()
    job2.cancel()
}

将 replay 设置成 2 后,相当于 SharedFlow 拥有了一个容量为 2 的缓存区。当有新订阅者出现时,SharedFlow 会把缓存的元素发送给订阅者。
运行程序,输出如下:

first : received 1
first : received 2
second: received 1
second: received 2

可以看到,虽然 collect() 在 emit() 之后,但由于缓存区的存在,SharedFlow 发出的数据还是被接收到了。

三、SharedFlow 的缓存区

SharedFlow 的构造函数中,第二个参数名为 extraBufferCapacity,译为「额外的缓存容量」。它的作用是处理背压。当下游消费速率过低时,数据会被发送到缓存区中。
所以,SharedFlow 的缓存区大小为 replay + extraBufferCapacity。

public fun <T> MutableSharedFlow(
    replay: Int = 0,
    extraBufferCapacity: Int = 0,
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
): MutableSharedFlow<T> {
    ...
}

SharedFlow 的构造函数中,第三个参数名为 onBufferOverflow,译为「在缓存区溢出时」。它的作用是指定缓存区满了之后,新发送的数据如何处理。
一共有三种溢出策略:

  • BufferOverflow.SUSPEND:默认的缓存溢出策略,它的作用是在缓存区满了之后,挂起发射函数,直至缓存区中的数据被消费后,再恢复。

  • BufferOverflow.DROP_OLDEST。它的作用是缓存区满了之后,丢弃最老的数据,以腾出空间来存储新数据。

  • BufferOverflow.DROP_LATEST。它的作用是缓存区满了之后,丢弃新来的数据。

举个例子:

runBlocking {
    val data = MutableSharedFlow<Int>(0, 2, BufferOverflow.SUSPEND)
    val job1 = launch {
        data.collect {
            delay(200)
            println("received $it")
        }
    }
    launch {
        repeat(5) {
            println("emit: $it")
            data.emit(it)
            println("$it emitted")
        }
    }
    delay(2000)
    job1.cancel()
}

在这个例子中,缓存区大小为 2,缓存溢出策略为 BufferOverflow.SUSPEND,当缓存区满后,发射函数会挂起。我们开启了一个协程,连续发射了 5 个数据。但接收者每隔 200ms 才接收一个数据,显然消费速率较慢。
运行程序,输出如下:

emit: 0
0 emitted
emit: 1
1 emitted
emit: 2
2 emitted
emit: 3
received 0
3 emitted
emit: 4
received 1
4 emitted
received 2
received 3
received 4

可以看到,SharedFlow 在发射前 3 个数据时没有任何停顿,emit 3 的时候却挂起了。这是因为缓存区容量为 2,加上等待区的一个位置,一共可以存储 3 个数据。0,1,2 三个数字发出后,缓存区就满了。
直到 0 被收集处理后,3 才发射出去。最终每个数据都被接收到了。

如果将缓存溢出策略改成 BufferOverflow.DROP_OLDEST 会如何呢?

runBlocking {
    val data = MutableSharedFlow<Int>(0, 2, BufferOverflow.DROP_OLDEST)
    val job1 = launch {
        data.collect {
            delay(200)
            println("received $it")
        }
    }
    launch {
        repeat(5) {
            println("emit: $it")
            data.emit(it)
            println("$it emitted")
        }
    }
    delay(2000)
    job1.cancel()
}

运行程序,输出如下:
emit: 0
0 emitted
emit: 1
1 emitted
emit: 2
2 emitted
emit: 3
3 emitted
emit: 4
4 emitted
received 3
received 4

可以看到,只有最后的 3 和 4 被接收到了。

如果换成 BufferOverflow.DROP_LATEST:

runBlocking {
    val data = MutableSharedFlow<Int>(0, 2, BufferOverflow.DROP_LATEST)
    ...
}

运行程序,输出如下:
emit: 0
0 emitted
emit: 1
1 emitted
emit: 2
2 emitted
emit: 3
3 emitted
emit: 4
4 emitted
received 0
received 1

可以看到,新的数据被丢弃了,只接收到了最开始的 0 和 1

四、通过 shareIn 将冷流转换成 SharedFlow

普通的 Flow 可以通过 shareIn 转换成 SharedFlow,我们来看一个例子:

runBlocking {
    val flow = flow {
        var i = 0
        while (true) {
            delay(200)
            println("emit $i")
            emit(i++)
        }
    }
    val sharedFlow = flow.shareIn(
        CoroutineScope(Dispatchers.IO),
        SharingStarted.Eagerly,
        0
    )
    val job = launch {
        delay(1000)
        println("subscribe")
        sharedFlow.collect {
            println("received $it")
        }
    }
    delay(2000)
    job.cancel()
    println("cancel")
    delay(2000)
    println("done")
}

在这个例子中,我们构建了一个间隔 200ms 发射一个数据的 Flow。然后通过 shareIn 函数将其转换为 SharedFlow。最后开启一个协程,延迟 1s 后接收 SharedFlow 的数据。
运行程序,输出如下:

emit 0
emit 1
emit 2
emit 3
subscribe
emit 4
received 4
emit 5
received 5
emit 6
received 6
emit 7
received 7
emit 8
received 8
cancel
emit 9
emit 10
emit 11
emit 12
emit 13
emit 14
emit 15
emit 16
emit 17
emit 18
done

可以看到,Flow 一直在不断地发射数据,当出现订阅者时,订阅者正常接收到了 Flow 发射的数据。
2s 后,订阅者被 cancel,不再接收数据,但 Flow 仍然在不断地发射数据。
无论有没有订阅者,Flow 都保持活跃,这种特性是由 shareIn 中传入的第二个参数 SharingStarted.Eagerly 决定的。

shareIn 函数需要三个参数。第一个用来指定协程作用域,第二个用来指定启动策略,第三个用来指定需要多少个粘性数据。

public fun <T> Flow<T>.shareIn(
    scope: CoroutineScope,
    started: SharingStarted,
    replay: Int = 0
): SharedFlow<T> {
    ...
}

一共有三种启动策略:

  • SharingStarted.Eagerly 饿汉式。立即启动 SharedFlow,不管有没有订阅者,上游的 Flow 都会保持活跃状态。

  • SharingStarted.Lazily 懒汉式。直到有订阅者出现时,才启动上游 Flow,Flow 启动后会一直保持活跃状态。

  • SharingStarted.WhileSubscribed 节能式。当有订阅者出现时,启动上游 Flow,当没有订阅者时,立即停止上游 Flow。

注:这三种策略的中文名是笔者胡诌的,建议读者使用英文名交流。

如果我们将上文中的例子换成 SharingStarted.Lazily:

runBlocking {
    val flow = flow {
        var i = 0
        while (true) {
            delay(200)
            println("emit $i")
            emit(i++)
        }
    }
    val sharedFlow = flow.shareIn(
        CoroutineScope(Dispatchers.IO),
        SharingStarted.Lazily,
        0
    )
    val job = launch {
        delay(1000)
        println("subscribe")
        sharedFlow.collect {
            println("received $it")
        }
    }
    delay(2000)
    job.cancel()
    println("cancel")
    delay(2000)
    println("done")
}

运行结果如下:
subscribe
emit 0
received 0
emit 1
received 1
emit 2
received 2
emit 3
received 3
cancel
emit 4
emit 5
emit 6
emit 7
emit 8
emit 9
emit 10
emit 11
emit 12
emit 13
done

可以看到,在 subscribe 之前,Flow 并没有发射数据。
如果我们将启动策略改成 SharingStarted.WhileSubscribed():

runBlocking {
    ...
    val sharedFlow = flow.shareIn(
        CoroutineScope(Dispatchers.IO),
        SharingStarted.WhileSubscribed(),
        0
    )
    ...
}

运行程序,输出如下:
subscribe
emit 0
received 0
emit 1
received 1
emit 2
received 2
emit 3
received 3
cancel
done

可以看到,在订阅者 cancel 后,上游的 Flow 立即停止了发送数据。在 Flow 停止后,如果有订阅者再次出现,则 Flow 又会从头开始运行。

五、小结

本文我们介绍了SharedFlow,它可以用于发送广播,通过 replay 控制保留多少粘性数据。

replay + extraBufferCapacity 决定了其缓存区容量。通过缓存溢出策略可以指定其缓存区满了之后的处理方式,

共有三种缓存溢出策略:

  • BufferOverflow.SUSPEND
  • BufferOverflow.DROP_OLDEST
  • BufferOverflow.DROP_LATEST

通过 shareIn 可以将冷流转换为 SharedFlow,转换时可以指定其启动策略。共有三种启动策略:SharingStarted.Eagerly,SharingStarted.Lazily,SharingStarted.WhileSubscribed。

原文
https://juejin.cn/post/7091306876431761415

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 218,546评论 6 507
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,224评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,911评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,737评论 1 294
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,753评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,598评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,338评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,249评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,696评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,888评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,013评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,731评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,348评论 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,929评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,048评论 1 270
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,203评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,960评论 2 355

推荐阅读更多精彩内容