目录
深入学习Kotlin之Flow(一),什么是Flow?Flow的基本使用)
深入学习Kotlin之Flow(二),Flow的操作符,以及协程的背压
前言
在DataStore里面有提到 DataStore是基于协程
与Flow
实现的,那么什么是Flow呢?
什么是Flow
Flow 库是在 Kotlin Coroutines 1.3.2 发布之后新增的库,也叫做异步流
类似 RxJava 的 Observable 、 Flowable 等等,所以很多人都用 Flow 与 RxJava 做对比。
Flow 相比于 RxJava 简单的太多了,你还记得那些 RxJava 傻傻分不清楚的操作符吗 Observable 、 Flowable 、 Single 、 Completable 、 Maybe 等等。
Flow解决了什么问题
-
LiveData 是一个生命周期感知组件,最好在 View 和 ViewModel 层中使用它,如果在 Repositories 或者 DataSource 中使用会有几个问题
- 它不支持线程切换,其次不支持背压,也就是在一段时间内发送数据的速度 > 接受数据的速度,LiveData 无法正确的处理这些请求
- 使用 LiveData 的最大问题是所有数据转换都将在主线程上完成
- RxJava 虽然支持线程切换和背压,但是 RxJava 那么多傻傻分不清楚的操作符,实际上在项目中常用的可能只有几个例如
Observable
、Flowable
、Single
等等,如果我们不去了解背后的原理,造成内存泄露是很正常的事,大家可以从 StackOverflow 上查看一下,有很多因为 RxJava 造成内存泄露的例子
- RxJava 入门的门槛很高,学习过的朋友们,我相信能够体会到从入门到放弃是什么感觉
相比之下Flow有了很不错的优点:
- Flow 支持线程切换、背压
- Flow 入门的门槛很低,没有那么多傻傻分不清楚的操作符
- 简单的数据转换与操作符,如 map 等等
- Flow 是对 Kotlin 协程的扩展,让我们可以像运行同步代码一样运行异步代码,使得代码更加简洁,提高了代码的可读性
- 易于做单元测试
- 解决回调地狱的问题
Flow的基本使用
Flow能够返回多个异步值
fun simple(): Flow<Int> = flow { // 流构建器
for (i in 1..3) {
delay(100) // 假装我们在这里做了一些有用的事情
emit(i) // 发送下一个值
}
}
fun main() = runBlocking<Unit> {
// 启动并发的协程以验证主线程并未阻塞
launch {
for (k in 1..3) {
println("I'm not blocked $k")
delay(100)
}
}
// 收集这个流
simple().collect { value -> println(value) }
打印结果:
I'm not blocked 1
1
I'm not blocked 2
2
I'm not blocked 3
3
其中 Flow 接口,只有一个 collect 函数
如果熟悉 RxJava 的话,则可以理解为 collect() 对应subscribe(),而 emit() 对应onNext()。
创建 Flow
创建Flow有几种方式:
flow builder 通过
flow { ... }
(上述例子就是)flowOf()
flowOf(1,2,3,4,5)
.onEach {
delay(100)
}
.collect{
println(it)
}
- asFlow()
listOf(1, 2, 3, 4, 5).asFlow()
.onEach {
delay(100)
}.collect {
println(it)
}
- channelFlow
{
for (i in 1..5) {
delay(100)
send(i)
}
}.collect{
println(it)
}
}
最后的 channelFlow builder 跟 flow builder 是有一定差异的。
flow 是 Cold Stream
,在没有切换线程的情况下,生产者和消费者是同步非阻塞的。
channel 是 Hot Stream
,而 channelFlow 实现了生产者和消费者异步非阻塞模型。
关于Cold Stream
与Hot Stream
我们后续会讲
切换线程
相比于 RxJava 需要使用 observeOn、subscribeOn 来切换线程,flow 会更加简单。只需使用 flowOn,下面的例子中,展示了 flow builder 和 map 操作符都会受到 flowOn 的影响
flow {
for (i in 1..5) {
delay(100)
emit(i)
}
}.map {
it * it
}.flowOn(Dispatchers.IO)
.collect {
println(it)
}
而 collect() 指定哪个线程,则需要看整个 flow 处于哪个 CoroutineScope 下。
例如,下面的代码 collect() 则是在 main 线程:
fun main() = runBlocking {
flow {
for (i in 1..5) {
delay(100)
emit(i)
}
}.map {
it * it
}.flowOn(Dispatchers.IO)
.collect {
println("${Thread.currentThread().name}: $it")
}
}
运行结果:
main: 1
main: 4
main: 9
main: 16
main: 25
flow 取消
如果 flow 是在协程被挂起了,那么 flow 是可以被取消的,否则不能取消。
fun main() = runBlocking {
withTimeoutOrNull(2500) {
flow {
for (i in 1..5) {
delay(1000)
emit(i)
}
}.collect {
println(it)
}
}
println("Done")
}
运行结果:
1
2
Done