一、通过 SharedFlow 实现广播
通过 SharedFlow 可以实现类似广播的功能:
runBlocking {
val data = MutableSharedFlow<Int>()
val job1 = launch {
data.collect {
println("first : received $it")
}
}
val job2 = launch {
data.collect {
println("second: received $it")
}
}
delay(1000)
data.emit(1)
delay(1000)
job1.cancel()
job2.cancel()
}
运行程序,输出如下:
first : received 1
second: received 1
需要注意的是,SharedFlow 默认无法收到 collect 前发射的值,在本例中,如果把 data.emit(1) 前的 delay(1000) 去掉,则有概率接收不到发射的 1,这是因为协程的启动需要时间,如果 在协程启动之前,emit(1) 就被调用了,那么协程启动后,就会收不到之前发射的值。
二、通过 replay 指定保留多少粘性数据
SharedFlow 的构造器中,有一个 replay 参数,它就是用来实现粘性数据的。它的意义是接收方 collect() 时,将多少个之前缓存的数据发送给接收方。
public fun <T> MutableSharedFlow(
replay: Int = 0,
extraBufferCapacity: Int = 0,
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
): MutableSharedFlow<T> {
...
}
SharedFlow 的 replay 默认是 0,也就是默认无粘性。
接下来我们将 replay 设置为 2,测试一下:
runBlocking {
val data = MutableSharedFlow<Int>(2)
data.emit(1)
data.emit(2)
val job1 = launch {
data.collect {
println("first : received $it")
}
}
val job2 = launch {
data.collect {
println("second: received $it")
}
}
delay(1000)
job1.cancel()
job2.cancel()
}
将 replay 设置成 2 后,相当于 SharedFlow 拥有了一个容量为 2 的缓存区。当有新订阅者出现时,SharedFlow 会把缓存的元素发送给订阅者。
运行程序,输出如下:
first : received 1
first : received 2
second: received 1
second: received 2
可以看到,虽然 collect() 在 emit() 之后,但由于缓存区的存在,SharedFlow 发出的数据还是被接收到了。
三、SharedFlow 的缓存区
SharedFlow 的构造函数中,第二个参数名为 extraBufferCapacity,译为「额外的缓存容量」。它的作用是处理背压。当下游消费速率过低时,数据会被发送到缓存区中。
所以,SharedFlow 的缓存区大小为 replay + extraBufferCapacity。
public fun <T> MutableSharedFlow(
replay: Int = 0,
extraBufferCapacity: Int = 0,
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
): MutableSharedFlow<T> {
...
}
SharedFlow 的构造函数中,第三个参数名为 onBufferOverflow,译为「在缓存区溢出时」。它的作用是指定缓存区满了之后,新发送的数据如何处理。
一共有三种溢出策略:
BufferOverflow.SUSPEND:默认的缓存溢出策略,它的作用是在缓存区满了之后,挂起发射函数,直至缓存区中的数据被消费后,再恢复。
BufferOverflow.DROP_OLDEST。它的作用是缓存区满了之后,丢弃最老的数据,以腾出空间来存储新数据。
BufferOverflow.DROP_LATEST。它的作用是缓存区满了之后,丢弃新来的数据。
举个例子:
runBlocking {
val data = MutableSharedFlow<Int>(0, 2, BufferOverflow.SUSPEND)
val job1 = launch {
data.collect {
delay(200)
println("received $it")
}
}
launch {
repeat(5) {
println("emit: $it")
data.emit(it)
println("$it emitted")
}
}
delay(2000)
job1.cancel()
}
在这个例子中,缓存区大小为 2,缓存溢出策略为 BufferOverflow.SUSPEND,当缓存区满后,发射函数会挂起。我们开启了一个协程,连续发射了 5 个数据。但接收者每隔 200ms 才接收一个数据,显然消费速率较慢。
运行程序,输出如下:
emit: 0
0 emitted
emit: 1
1 emitted
emit: 2
2 emitted
emit: 3
received 0
3 emitted
emit: 4
received 1
4 emitted
received 2
received 3
received 4
可以看到,SharedFlow 在发射前 3 个数据时没有任何停顿,emit 3 的时候却挂起了。这是因为缓存区容量为 2,加上等待区的一个位置,一共可以存储 3 个数据。0,1,2 三个数字发出后,缓存区就满了。
直到 0 被收集处理后,3 才发射出去。最终每个数据都被接收到了。
如果将缓存溢出策略改成 BufferOverflow.DROP_OLDEST 会如何呢?
runBlocking {
val data = MutableSharedFlow<Int>(0, 2, BufferOverflow.DROP_OLDEST)
val job1 = launch {
data.collect {
delay(200)
println("received $it")
}
}
launch {
repeat(5) {
println("emit: $it")
data.emit(it)
println("$it emitted")
}
}
delay(2000)
job1.cancel()
}
运行程序,输出如下:
emit: 0
0 emitted
emit: 1
1 emitted
emit: 2
2 emitted
emit: 3
3 emitted
emit: 4
4 emitted
received 3
received 4
可以看到,只有最后的 3 和 4 被接收到了。
如果换成 BufferOverflow.DROP_LATEST:
runBlocking {
val data = MutableSharedFlow<Int>(0, 2, BufferOverflow.DROP_LATEST)
...
}
运行程序,输出如下:
emit: 0
0 emitted
emit: 1
1 emitted
emit: 2
2 emitted
emit: 3
3 emitted
emit: 4
4 emitted
received 0
received 1
可以看到,新的数据被丢弃了,只接收到了最开始的 0 和 1。
四、通过 shareIn 将冷流转换成 SharedFlow
普通的 Flow 可以通过 shareIn 转换成 SharedFlow,我们来看一个例子:
runBlocking {
val flow = flow {
var i = 0
while (true) {
delay(200)
println("emit $i")
emit(i++)
}
}
val sharedFlow = flow.shareIn(
CoroutineScope(Dispatchers.IO),
SharingStarted.Eagerly,
0
)
val job = launch {
delay(1000)
println("subscribe")
sharedFlow.collect {
println("received $it")
}
}
delay(2000)
job.cancel()
println("cancel")
delay(2000)
println("done")
}
在这个例子中,我们构建了一个间隔 200ms 发射一个数据的 Flow。然后通过 shareIn 函数将其转换为 SharedFlow。最后开启一个协程,延迟 1s 后接收 SharedFlow 的数据。
运行程序,输出如下:
emit 0
emit 1
emit 2
emit 3
subscribe
emit 4
received 4
emit 5
received 5
emit 6
received 6
emit 7
received 7
emit 8
received 8
cancel
emit 9
emit 10
emit 11
emit 12
emit 13
emit 14
emit 15
emit 16
emit 17
emit 18
done
可以看到,Flow 一直在不断地发射数据,当出现订阅者时,订阅者正常接收到了 Flow 发射的数据。
2s 后,订阅者被 cancel,不再接收数据,但 Flow 仍然在不断地发射数据。
无论有没有订阅者,Flow 都保持活跃,这种特性是由 shareIn 中传入的第二个参数 SharingStarted.Eagerly 决定的。
shareIn 函数需要三个参数。第一个用来指定协程作用域,第二个用来指定启动策略,第三个用来指定需要多少个粘性数据。
public fun <T> Flow<T>.shareIn(
scope: CoroutineScope,
started: SharingStarted,
replay: Int = 0
): SharedFlow<T> {
...
}
一共有三种启动策略:
SharingStarted.Eagerly 饿汉式。立即启动 SharedFlow,不管有没有订阅者,上游的 Flow 都会保持活跃状态。
SharingStarted.Lazily 懒汉式。直到有订阅者出现时,才启动上游 Flow,Flow 启动后会一直保持活跃状态。
SharingStarted.WhileSubscribed 节能式。当有订阅者出现时,启动上游 Flow,当没有订阅者时,立即停止上游 Flow。
注:这三种策略的中文名是笔者胡诌的,建议读者使用英文名交流。
如果我们将上文中的例子换成 SharingStarted.Lazily:
runBlocking {
val flow = flow {
var i = 0
while (true) {
delay(200)
println("emit $i")
emit(i++)
}
}
val sharedFlow = flow.shareIn(
CoroutineScope(Dispatchers.IO),
SharingStarted.Lazily,
0
)
val job = launch {
delay(1000)
println("subscribe")
sharedFlow.collect {
println("received $it")
}
}
delay(2000)
job.cancel()
println("cancel")
delay(2000)
println("done")
}
运行结果如下:
subscribe
emit 0
received 0
emit 1
received 1
emit 2
received 2
emit 3
received 3
cancel
emit 4
emit 5
emit 6
emit 7
emit 8
emit 9
emit 10
emit 11
emit 12
emit 13
done
可以看到,在 subscribe 之前,Flow 并没有发射数据。
如果我们将启动策略改成 SharingStarted.WhileSubscribed():
runBlocking {
...
val sharedFlow = flow.shareIn(
CoroutineScope(Dispatchers.IO),
SharingStarted.WhileSubscribed(),
0
)
...
}
运行程序,输出如下:
subscribe
emit 0
received 0
emit 1
received 1
emit 2
received 2
emit 3
received 3
cancel
done
可以看到,在订阅者 cancel 后,上游的 Flow 立即停止了发送数据。在 Flow 停止后,如果有订阅者再次出现,则 Flow 又会从头开始运行。
五、小结
本文我们介绍了SharedFlow,它可以用于发送广播,通过 replay 控制保留多少粘性数据。
replay + extraBufferCapacity 决定了其缓存区容量。通过缓存溢出策略可以指定其缓存区满了之后的处理方式,
共有三种缓存溢出策略:
- BufferOverflow.SUSPEND,
- BufferOverflow.DROP_OLDEST,
- BufferOverflow.DROP_LATEST。
通过 shareIn 可以将冷流转换为 SharedFlow,转换时可以指定其启动策略。共有三种启动策略:SharingStarted.Eagerly,SharingStarted.Lazily,SharingStarted.WhileSubscribed。