阅读过实战系列三的文章的同学可能有印象,我有提到过Kotlin Flow和Room数据库能很好的配合使用。此外,不久前官方推出了DataStore也是基于Kotlin Flow,所以本节我们先来了解下Kotlin Flow的使用方法。
Flow 引入的场景
Kotlin的官方解释引入Flow的场景是:
Return multiple asynchronously computed values over time
这句话很重要,它包括几个关键含义:
- 返回值有多个
- 返回值是分开的
- 返回的是异步结果
接下来我们看看单个含义在Kotlin中如何实现:
- 返回多个值 - Collections
返回多个值我们能很容易的想到使用Collections。例如一个
返回List
的listExample
的函数。
fun listExample(): List<Int> {
return listOf(1, 2, 3)
}
listExample().forEach { value -> println(value) }
// 1, 2, 3
- 值是分开返回的
上面的情况多个值是作为一个整体一起返回的,如果让多个值分开返回就得使用Kotlin的另外一个数据类型Sequences。
fun sequenceExample(): Sequence<Int> {
return sequence {
Thread.sleep(1000) // 假装在执行耗时操作
yield(1) // 返回值
Thread.sleep(1000)
yield(2)
Thread.sleep(1000)
yield(3)
}
}
sequenceExample().forEach { value ->
println(value)
println(Date().time)
}
// 结果:
I/System.out: 1
I/System.out: 1605153441181
I/System.out: 2
I/System.out: 1605153442182
I/System.out: 3
I/System.out: 1605153443185
结果可以看出,通过使用sequence可以实现多个结果分批次单个数值的返回。
- 异步返回
异步返回可以使用susupend函数。
suspend fun simpleAsyn(): List<Int> {
delay(1000) // 假装在执行异步耗时操作
return listOf(1, 2, 3)
}
- Flow实现Return multiple asynchronously computed values over time
fun simpleFlow(): Flow<Int> {
return flow {
delay(1000) // 假装在执行异步耗时操作
emit(1) // 返回值
delay(1000)
emit(2)
delay(1000)
emit(3)
}
}
simpleFlow().collect { value -> println(value) }
// 1, 2, 3
结论:Flow可以简单的理解为异步的Sequence。
Flow 的特性
Flow可以理解成异步的Sequence,也具有和Sequence类似的一些特性。
- 延迟执行
我们的代码如下,打印结果出来可能会比较费解。
val streamOfInts: Flow<Int> = flow {
println("开始发送===>")
for (i in 1..3) {
delay(100)
println("发送初始值===>$i")
emit(i)
}
println("结束发送===>")
}
streamOfInts.map { value -> value * value }
println("Flow完成===>")
// 打印结果:
println("Flow完成===>")
为什么呢? 因为Flow是惰性的,streamOfInts这个Flow没有执行结束操作函数,Flow中的各种中间操作函数不会立即执行
结束操作函数 --- Flow执行函数后的返回结果不是Flow,那这个函数就是结束操作函数
中间操作函数 --- Flow执行函数后的返回结果仍然是Flow
我们接下来修改代码,执行collect这个函数,它的意义是接收Flow传过来的值:
val streamOfInts: Flow<Int> = flow {
println("开始发送===>")
for (i in 1..3) {
delay(100)
println("发送初始值===>$i")
emit(i)
}
println("结束发送===>")
}
streamOfInts.map { value -> value * value }.collect { println("最终收到 ===> $it") }
// 打印结果:
I/System.out: 开始发送===>
I/System.out: 发送初始值===>1
I/System.out: 最终收到 ===> 1
I/System.out: 发送初始值===>2
I/System.out: 最终收到 ===> 4
I/System.out: 发送初始值===>3
I/System.out: 最终收到 ===> 9
I/System.out: 结束发送===>
I/System.out: Flow完成===>
熟悉RxJava的同学是不是似曾相识的感觉,没错Flow就是冷信号,只有被订阅后才会发送值。
- 顺序执行
上面的结果我们能看出来,分别对每个值进行了所有的操作后才进行下一个值的处理,即先处理完1,再处理2,再处理3。
1 - 1 - 2 - 4 - 3 - 9
Flow的构造函数
- 通过
flowOf
函数
flowOf(1, 2, 3)
// 1, 2, 3
flowOf(listOf(1,2,3))
// [1, 2, 3]
-
Iterable调用
asFlow
函数
listOf(1, 2, 3).asFlow()
// 1, 2, 3
- 无参但是有返回值的函数(() -> T)调用
asFlow
函数
fun flowBuilderFunction(): Int {
return 10
}
::flowBuilderFunction.asFlow()
// 10
- 无参但是有返回值的挂起函数(() -> T)调用
asFlow
函数
suspend fun flowBuilderFunction(): Int {
return 10
}
::flowBuilderFunction.asFlow()
// 10
-
Array调用
asFlow
函数
arrayOf(1,2,3).asFlow()
// 1, 2, 3
-
LongRange调用
asFlow
函数
LongRange(1, 5).asFlow().collect { value -> println(value) }
// 1, 2, 3, 4, 5
- Empty Flow
emptyFlow<String>()
Flow中间运算函数
Delay相关的运算函数
debounce
- 如果两个相邻的值生产出来的时间间隔超过了
[timeout]
毫秒,就忽过滤掉前一个值- 最后一个值不受影响,总是会被释放
emit
。
flow {
emit(1)
delay(3000)
emit(2)
delay(1000)
emit(3)
delay(1000)
emit(4)
}.debounce(2000)
// 结果:1 4
// 解释:
// 2和1的间隔大于2000,1被释放
// 3和2的间隔小于2000, 2被忽略
// 4和3的间隔小于2000, 3被忽略
// 4是最后一个值不受timeout值的影响, 4被释放
[timeout]
可以传毫秒,也可以传Duration
注意:还是个实验性的方法,使用的时候需要加上
@ExperimentalTime
flow {
emit(1)
delay(3000)
emit(2)
delay(1000)
emit(3)
delay(1000)
emit(4)
}.debounce(2000.milliseconds)
// 结果:1 4
Distinct相关的运算函数
distinctUntilChanged
- 如果生产的值和上个发送的值相同,值就会被过滤掉
flow {
emit(1)
emit(1)
emit(2)
emit(2)
emit(3)
emit(4)
}.distinctUntilChanged()
// 结果:1 2 3 4
// 解释:
// 第一个1被释放
// 第二个1由于和第一个1相同,被过滤掉
// 第一个2被释放
// 第二个2由于和第一个2相同,被过滤掉
// 第一个3被释放
// 第一个4被释放
- 可以传参
(old: T, new: T) -> Boolean
,进行自定义的比较
private class Person(val age: Int, val name: String)
flow {
emit(Person(20, "张三"))
emit(Person(21, "李四"))
emit(Person(21, "王五"))
emit(Person(22, "赵六"))
}.distinctUntilChanged{old, new -> old.age == new.age }
.collect{ value -> println(value.name) }
// 结果:张三 李四 赵六
// 解释:本例子定义如果年龄相同就认为是相同的值,所以王五被过滤掉了
- 可以用
distinctUntilChangedBy
转换成年龄进行对比
flow {
emit(Person(20, "张三"))
emit(Person(21, "李四"))
emit(Person(21, "王五"))
emit(Person(22, "赵六"))
}.distinctUntilChangedBy { person -> person.age }
// 结果:张三 李四 赵六
Emitters相关的运算函数
transform
对每个值进行转换
flow {
emit(1)
emit(2)
emit(3)
emit(4)
}.transform {
if (it % 2 == 0) {
emit(it * it)
}
}
// 结果:4 16
// 解释:
// 1 不是偶数,被忽略
// 2 是偶数,2的平方4
// 3 不是偶数,被忽略
// 4 是偶数,4的平方16
onStart
第一个值被释放之前被执行
flow {
emit(1)
emit(2)
emit(3)
emit(4)
}.onStart { emit(1000) }
// 结果:1000 1 2 3 4
// 解释:
// 第一个值1被释放的时候调用了emit(1000), 所以1000在1之前被释放
onCompletion
最后一个值释放完成之后被执行
flow {
emit(1)
emit(2)
emit(3)
emit(4)
}.onCompletion { emit(1000) }
// 结果:1 2 3 4 1000
// 解释:
// 第一个值4被释放的时候调用了emit(1000), 所以1000在4之后被释放
Limit相关的运算函数
drop
忽略最开始的
[count]
个值
flow {
emit(1)
emit(2)
emit(3)
emit(4)
}.drop(2)
// 结果:3 4
// 解释:
// 最开始释放的两个值(1,2)被忽略了
dropWhile
判断第一个值如果满足
(T) -> Boolean
这个条件就忽略
flow {
emit(1)
emit(2)
emit(3)
emit(4)
}.dropWhile {
it % 2 == 0
}
// 结果:1 2 3 4
// 解释:
// 第一个值不是偶数,所以1被释放
flow {
emit(1)
emit(2)
emit(3)
emit(4)
}.dropWhile {
it % 2 != 0
}
// 结果:2 3 4
// 解释:
// 第一个值是偶数,所以1被忽略
take
只释放前面
[count]
个值
flow {
emit(1)
emit(2)
emit(3)
emit(4)
}.take(2)
// 结果:1 2
// 解释:
// 前面两个值被释放
takeWhile
判断第一个值如果满足
(T) -> Boolean
这个条件就释放
flow {
emit(1)
emit(2)
emit(3)
emit(4)
}.takeWhile { it%2 != 0 }
// 结果:1
// 解释:
// 第一个值满足是奇数条件
flow {
emit(1)
emit(2)
emit(3)
emit(4)
}.takeWhile { it%2 == 0 }
// 结果:无
// 解释:
// 第一个值不满足是奇数条件
CoroutineContext相关的运算函数
flowOn
可以切换CoroutineContext
说明:flowOn只影响该运算符之前的CoroutineContext,对它之后的CoroutineContext没有任何影响
withContext(Dispatchers.Main) {
flow {
println("Thread name1 = ${Thread.currentThread().name}")
emit(1)
emit(2)
emit(3)
emit(4)
}
.map {
println("Thread name2 = ${Thread.currentThread().name}")
it * it
}
.filter {
println("Thread name3 = ${Thread.currentThread().name}")
it > 9
}
.flowOn(Dispatchers.IO)
.collect{value ->
println("Thread name4 = ${Thread.currentThread().name}")
println(value)
}
}
// 结果:
// Thread name1 = DefaultDispatcher-worker-1
// Thread name2 = DefaultDispatcher-worker-1
// Thread name3 = DefaultDispatcher-worker-1
// Thread name2 = DefaultDispatcher-worker-1
// Thread name3 = DefaultDispatcher-worker-1
// Thread name2 = DefaultDispatcher-worker-1
// Thread name3 = DefaultDispatcher-worker-1
// Thread name2 = DefaultDispatcher-worker-1
// Thread name3 = DefaultDispatcher-worker-1
// Thread name4 = main
// 16
// 解释:
// flowOn(Dispatchers.IO)之前的flow,map,filter都是在Dispatchers.IO上执行
// flowOn(Dispatchers.IO)之后的collect则由withContext(Dispatchers.Main)决定
buffer
将flow的多个任务分配到不同的协程中去执行,加快执行的速度。
val flow1 = flow {
delay(2000) // 假设耗时任务
emit(1) // 释放值
delay(2000)
emit(2)
delay(2000)
emit(3)
delay(2000)
emit(4)
}
flow1.collect { value ->
println(value)
delay(4000)
}
// 结果
// 2020-11-16 13:48:37.144 24060-24116/com.johnny.flowdemo I/System.out: 1
// 2020-11-16 13:48:43.150 24060-24116/com.johnny.flowdemo I/System.out: 2
// 2020-11-16 13:48:49.160 24060-24116/com.johnny.flowdemo I/System.out: 3
// 2020-11-16 13:48:55.166 24060-24116/com.johnny.flowdemo I/System.out: 4
// 解释:
// 4个耗时操作每个2000毫秒,加上collect的4000毫秒,所以每个值的时间间隔是6000毫秒。
val flow2 = flow {
delay(2000) // 假设耗时任务
emit(1) // 释放值
delay(2000)
emit(2)
delay(2000)
emit(3)
delay(2000)
emit(4)
}.buffer()
flow2.collect { value ->
println(value)
delay(4000)
}
// 结果
// 2020-11-16 13:51:11.290 24253-24299/com.johnny.flowdemo I/System.out: 1
// 2020-11-16 13:51:15.293 24253-24299/com.johnny.flowdemo I/System.out: 2
// 2020-11-16 13:51:19.297 24253-24300/com.johnny.flowdemo I/System.out: 3
// 2020-11-16 13:51:23.301 24253-24300/com.johnny.flowdemo I/System.out: 4
// 解释:
// 4个耗时操作被分配到了不同的协程中执行,总共耗时了大约2000毫秒。collect收到的4个值差不多同时,所以每个值依次收到的时间间隔是4000毫秒。
conflate
如果值的生产速度大于值的消耗速度,就忽略掉中间未来得及处理的值,只处理最新的值。
val flow1 = flow {
delay(2000)
emit(1)
delay(2000)
emit(2)
delay(2000)
emit(3)
delay(2000)
emit(4)
}.conflate()
flow1.collect { value ->
println(value)
delay(5000)
}
// 结果: 1 3 4
// 解释:
// 2000毫秒后生产了1这个值,交由collect执行,花费了5000毫秒,当1这个值执行collect完成后已经经过了7000毫秒。
// 这7000毫秒中,生产了2,但是collect还没执行完成又生产了3,所以7000毫秒以后会直接执行3的collect方法,忽略了2这个值
// collect执行完3后,还有一个4,继续执行。
Flatten相关的运算函数
flatMapConcat
将原始的
Flow<T>
通过[transform]
转换成Flow<Flow<T>>
,然后将Flow<Flow<T>>
释放的Flow<T>
其中释放的值一个个释放。
flow {
delay(1000)
emit(1)
delay(1000)
emit(2)
delay(1000)
emit(3)
delay(1000)
emit(4)
}.flatMapConcat {
flow {
emit("$it 产生第一个flow值")
delay(2500)
emit("$it 产生第二个flow值")
}
}.collect { value ->
println(value)
}
// 结果
// I/System.out: 1 产生第一个flow值
// I/System.out: 1 产生第二个flow值
// I/System.out: 2 产生第一个flow值
// I/System.out: 2 产生第二个flow值
// I/System.out: 3 产生第一个flow值
// I/System.out: 3 产生第二个flow值
// I/System.out: 4 产生第一个flow值
// I/System.out: 4 产生第二个flow值
// 解释:
// 原始Flow<Int>通过flatMapConcat被转换成Flow<Flow<Int>>
// 原始Flow<Int>首先释放1,接着Flow<Flow<Int>> 就会释放 1产生第一个flow值 和 1产生第二个flow值 两个值
// Flow<Int>释放2,...
// Flow<Int>释放3,...
// Flow<Int>释放4,...
flattenConcat
和flatMapConcat类似,只是少了一步Map操作。
flow {
delay(1000)
emit(flow {
emit("1 产生第一个flow值")
delay(2000)
emit("1 产生第二个flow值") })
delay(1000)
emit(flow {
emit("2 产生第一个flow值")
delay(2000)
emit("3 产生第二个flow值") })
delay(1000)
emit(flow {
emit("3 产生第一个flow值")
delay(2000)
emit("3 产生第二个flow值") })
delay(1000)
emit(flow {
emit("4 产生第一个flow值")
delay(2500)
emit("4 产生第二个flow值") })
}.flattenConcat()
// 结果
// I/System.out: 1 产生第一个flow值
// I/System.out: 1 产生第二个flow值
// I/System.out: 2 产生第一个flow值
// I/System.out: 2 产生第二个flow值
// I/System.out: 3 产生第一个flow值
// I/System.out: 3 产生第二个flow值
// I/System.out: 4 产生第一个flow值
// I/System.out: 4 产生第二个flow值
flatMapMerge
将原始的
Flow<T>
通过[transform]
转换成Flow<Flow<T>>
,然后将Flow<Flow<T>>
释放的Flow<T>
其中释放的值一个个释放。它与flatMapConcat的区别是:
Flow<Flow<T>>
释放的Flow<T>
其中释放的值没有顺序性,谁先产生谁先释放。
flow {
delay(1000)
emit(1)
delay(1000)
emit(2)
delay(1000)
emit(3)
delay(1000)
emit(4)
}.flatMapMerge {
flow {
emit("$it 产生第一个flow值")
delay(2500)
emit("$it 产生第二个flow值")
}
}.collect { value ->
println(value)
}
// 结果
// I/System.out: 1 产生第一个flow值
// I/System.out: 2 产生第一个flow值
// I/System.out: 3 产生第一个flow值
// I/System.out: 1 产生第二个flow值
// I/System.out: 4 产生第一个flow值
// I/System.out: 2 产生第二个flow值
// I/System.out: 3 产生第二个flow值
// I/System.out: 4 产生第二个flow值
// 解释:
// 原始Flow<Int>首先释放1, 第二个Flow<Flow<Int>> 释放 1产生第一个flow值,但是 1产生第二个flow值是3500毫秒才释放,2 产生第一个flow值 是2000毫秒释放, 3 产生第一个flow值 是3000毫秒释放,3500毫秒时刻才是 1产生第二个flow值 的释放
flatMapMerge
和 flattenMerge类似,只是少了一步Map操作。
merge
将
Iterable<Flow<T>>
合并成一个Flow<T>
val flow1 = listOf(
flow {
emit(1)
delay(500)
emit(2)
},
flow {
emit(3)
delay(500)
emit(4)
},
flow {
emit(5)
delay(500)
emit(6)
}
)
flow1.merge().collect { value -> println("$value") }
// 结果: 1 3 5 2 4 6
// 解释:
// 按Iterable的顺序和耗时顺序依次释放值
transformLatest
原始flow会触发
transformLatest
转换后的flow, 当原始flow有新的值释放后,transformLatest
转换后的flow会被取消,接着触发新的转换后的flow
flow {
emit(1)
delay(1000)
emit(2)
delay(2000)
emit(3)
delay(3000)
emit(4)
}.transformLatest { value ->
delay(2500)
emit(value * value )
}
// 结果: 9 16
// 解释:
// 原始Flow释放1以后,转换后的Flow还没来得及释放1,原始Flow释放2
// 原始Flow释放2以后,转换后的Flow还没来得及释放4,原始Flow释放3
// 原始Flow释放3以后,转换后的Flow有足够的时间释放9
// 原始Flow释放4以后,转换后的Flow有足够的时间释放16
flatMapLatest
和transformLatest类似, 原始flow会触发
transformLatest
转换后的flow, 当原始flow有新的值释放后,transformLatest
转换后的flow会被取消,接着触发新的转换后的flow
区别:flatMapLatest的
transform
转换成的是Flow<T>
, transformLatest的transform
转换成的是Unit
flow {
emit(1)
delay(1000)
emit(2)
delay(2000)
emit(3)
delay(3000)
emit(4)
}.flatMapLatest { value ->
flow {
delay(2500)
emit(value * value )
}
}
// 结果: 9 16
// 解释:
// 原始Flow释放1以后,转换后的Flow还没来得及释放1,原始Flow释放2
// 原始Flow释放2以后,转换后的Flow还没来得及释放4,原始Flow释放3
// 原始Flow释放3以后,转换后的Flow有足够的时间释放9
// 原始Flow释放4以后,转换后的Flow有足够的时间释放16
mapLatest
和transformLatest类似, 原始flow会触发
transformLatest
转换后的flow, 当原始flow有新的值释放后,transformLatest
转换后的flow会被取消,接着触发新的转换后的flow
区别:mapLatest的
transform
转换成的是T
,flatMapLatest的transform
转换成的是Flow<T>
,transformLatest的transform
转换成的是Unit
Transform相关的运算函数
filter
通过
predicate
进行过滤,满足条件则被释放
flow {
emit(1)
emit(2)
emit(3)
emit(4)
}.filter { it % 2 == 0 }
// 结果: 2 4
// 解释:
// 2和4满足it % 2 == 0,被释放
filterNot
通过
predicate
进行过滤,不满足条件则被释放
flow {
emit(1)
emit(2)
emit(3)
emit(4)
}.filterNot { it % 2 == 0 }
// 结果: 1 3
// 解释:
// 1和3不满足it % 2 == 0,被释放
filterIsInstance
如果是某个数据类型则被释放
flow {
emit(1)
emit("2")
emit("3")
emit(4)
}.filterIsInstance<String>()
// 结果: "2" "3"
// 解释:
// "2" "3"是String类型,被释放
filterNotNull
如果数据是非空,则被释放
flow {
emit(1)
emit("2")
emit("3")
emit(null)
}.filterNotNull()
// 结果: 1 "2" "3"
map
将一个值转换成另外一个值
flow {
emit(1)
emit(2)
emit(3)
emit(4)
}.map { it * it }
// 结果: 1 4 9 16
// 解释:
// 将1,2,3,4转换成对应的平方数
mapNotNull
将一个非空值转换成另外一个值
withIndex
将值封装成IndexedValue对象
flow {
emit(1)
emit(2)
emit(3)
emit(4)
}.withIndex()
// 结果:
// I/System.out: IndexedValue(index=0, value=1)
// I/System.out: IndexedValue(index=1, value=2)
// I/System.out: IndexedValue(index=2, value=3)
// I/System.out: IndexedValue(index=3, value=4)
onEach
每个值释放的时候可以执行的一段代码
flow {
emit(1)
emit(2)
emit(3)
emit(4)
}.onEach { println("接收到$it") }
// 结果:
I/System.out: 接收到1
I/System.out: 1
I/System.out: 接收到2
I/System.out: 2
I/System.out: 接收到3
I/System.out: 3
I/System.out: 接收到4
I/System.out: 4
scan
有一个初始值,然后每个值都和初始值进行运算,然后这个值作为后一个值的初始值
flow {
emit(1)
emit(2)
emit(3)
emit(4)
}.scan(100) { acc, value ->
acc * value
}
// 结果: 100 100 200 600 2400
// 解释:
// 初始值 100
// 1 100 * 1 = 100
// 2 100 * 2 = 200
// 3 200 * 3 = 600
// 4 600 * 4 = 2400
runningReduce
和scan类似,但是没有初始值,最开始是它本身
flow {
emit(1)
emit(2)
emit(3)
emit(4)
}.runningReduce { acc, value ->
acc * value
}
// 结果: 1 2 6 24
// 解释:
// 1 1
// 2 1 * 2 = 2
// 3 2 * 3 = 6
// 4 6 * 4 = 24
Combine相关的运算函数
任意一个flow释放值且都有释放值后会调用combine后的代码块,且值为每个flow的最新值。
val flow1 = flowOf(1, 2, 3, 4).onEach { delay(10) }
val flow2 = flowOf("a", "b", "c", "d").onEach { delay(20) }
flow1.combine(flow2) { first, second ->
"$first$second"
}.collect { println("$it") }
// 结果:1a 2a 2b 3b 4b 4c 4d
// 解释:
// 开始 --- flow1 释放 1,flow2 释放 a, 释放1a
// 10毫秒 --- flow1 释放 2,释放2a
// 20毫秒 --- flow2 释放 b,此时释放2b
// 30毫秒 --- flow1 释放 3,此时释放3b
// 40毫秒 --- flow1 释放 4,此时释放4b
// 40毫秒 --- flow2 释放 c,此时释放4c
// 60毫秒 --- flow2 释放 d,此时释放4d
说明:Combine能够接受的参数类型非常多,但是效果都是如上类似。
Flow结束函数
Collect相关的结束函数
collect
接收值,一直有用,无需再做介绍。
launchIn
scope.launch { flow.collect() }
的缩写, 代表在某个协程上下文环境中去接收释放的值
val flow1 = flow {
delay(1000)
emit(1)
delay(1000)
emit(2)
delay(1000)
emit(3)
delay(1000)
emit(4)
}
flow1.onEach { println("$it") }
.launchIn(GlobalScope)
// 结果:1 2 3 4
collectIndexed
和
withIndex
对应的,接收封装的IndexedValue
val flow1 = flow {
emit(1)
emit(2)
emit(3)
emit(4)
}.withIndex()
flow1.collectIndexed { index, value ->
println("index = $index, value = $value")
}
// 结果:
// I/System.out: index = 0, value = IndexedValue(index=0, value=1)
// I/System.out: index = 1, value = IndexedValue(index=1, value=2)
// I/System.out: index = 2, value = IndexedValue(index=2, value=3)
// I/System.out: index = 3, value = IndexedValue(index=3, value=4)
collectLatest
collectLatest与collect的区别是,如果有新的值释放,上一个值的操作如果没执行完则将会被取消
val flow1 = flow {
emit(1)
delay(1000)
emit(2)
delay(1000)
emit(3)
delay(2000)
emit(4)
}
flow1.collectLatest {
println("正在计算收到的值 $it")
delay(1500)
println("收到的值 $it")
}
// 结果:
// I/System.out: 正在计算收到的值 1
// I/System.out: 正在计算收到的值 2
// I/System.out: 正在计算收到的值 3
// I/System.out: 收到的值 3
// I/System.out: 正在计算收到的值 4
// I/System.out: 收到的值 4
// 解释:
// 1间隔1000毫秒后释放2,2间隔1000毫秒后释放3,这间隔小于需要接收的时间1500毫秒,所以当2和3 到来后,之前的操作被取消了。
// 3和4 之间的间隔够长能够等待执行完毕,4是最后一个值也能执行
Collection相关的结束函数
toList
将释放的值转换成List
flow {
emit(1)
delay(1000)
emit(2)
delay(1000)
emit(3)
delay(2000)
emit(4)
}
println(flow1.toList())
// 结果:[1, 2, 3, 4]
toSet
将释放的值转换成Set
flow {
emit(1)
delay(1000)
emit(2)
delay(1000)
emit(3)
delay(2000)
emit(4)
}
println(flow1.toSet())
// 结果:[1, 2, 3, 4]
Count相关的结束函数
count
- 计算释放值的个数
val flow1 = flow {
emit(1)
delay(1000)
emit(2)
delay(1000)
emit(3)
delay(2000)
emit(4)
}
println(flow1.count())
// 结果:4
- 计算满足某一条件的释放值的个数
val flow1 = flow {
emit(1)
delay(1000)
emit(2)
delay(1000)
emit(3)
delay(2000)
emit(4)
}
println(flow1.count { it % 2 == 0 })
// 结果:2
// 解释:
// 偶数有2个值 2 4
Reduce相关的结束函数
reduce
和runningReduce类似,但是只计算最后的结果。
val flow1 = flow {
emit(1)
emit(2)
emit(3)
emit(4)
}
println(flow1.reduce { acc, value -> acc * value })
// 结果:24
// 解释:计算最后的结果,1 * 2 * 3 * 4 = 24
fold
和scan类似,有一个初始值,但是只计算最后的结果。
val flow1 = flow {
emit(1)
emit(2)
emit(3)
emit(4)
}
println(flow1.fold(100) { acc, value -> acc * value })
// 结果:2400
// 解释:计算最后的结果,100 * 1 * 2 * 3 * 4 = 2400
single
只接收一个值的Flow
注意:多于1个或者没有值都会报错
val flow1 = flow {
emit(1)
}
println(flow1.single())
// 结果:1
singleOrNull
接收一个值的Flow或者一个空值的Flow
first/firstOrNull
- 接收释放的第一个值/接收第一个值或者空值
val flow1 = flow {
emit(1)
emit(2)
emit(3)
emit(4)
}
println(flow1.first())
// 结果:1
- 接收第一个满足某个条件的值
val flow1 = flow {
emit(1)
emit(2)
emit(3)
emit(4)
}
println(flow1.first { it % 2 == 0})
// 结果:2
Flow的错误异常处理
- 可以通过 try catch 捕获错误异常
try {
flow {
emit(1)
emit(2)
emit(3)
emit(4)
}.collect {
println("接收值 $it")
check(it <= 1) { "$it 大于1" }
}
} catch (e: Throwable) {
println("收到了异常: $e")
}
// 结果:
// I/System.out: 接收值 1
// I/System.out: 接收值 2
// I/System.out: 收到了异常: java.lang.IllegalStateException: 2 大于1
// 解释:
// 收到2的时候就抛出了异常,让后flow被取消,异常被捕获
- 通过
catch
函数
catch
函数能够捕获之前产生的异常,之后的异常无法捕获。
flow {
emit(1)
emit(2)
emit(3)
emit(4)
}.map {
check(it <= 1) { "$it 大于1" }
it
}
.catch { e -> println("Caught $e") }
.collect()
// 结果:
// Caught java.lang.IllegalStateException: 2 大于1
Flow的取消
Flow的取消可以执行CoroutineScope.cancel
GlobalScope.launch {
val flow1 = flow {
emit(1)
emit(2)
emit(3)
emit(4)
}
flow1.collect { value ->
println("$value")
if (value >= 3) {
cancel()
}
}
}
// 结果:1 2 3
StateFlow/MutableStateFlow
我们了解了Flow是冷信号,类似于RxJava中的observables
。Kotlin还提供了一个类似于RxJava中的Subject
的StateFlow。MutableStateFlow是相当于值可变的StateFlow
它有如下几个特点:
- StateFlow主要用来管理和表示几种不同的状态
例如:网络请求中(Requesting),网络请求完成(Request Complete),网络请求失败(Request Fail)。
-
StateFlow只有值变化后才会释放新的值,和
distinctUntilChanged
类似 -
collect
对于它不是必需的,StateFlow创建的时候就能开始释放值
class CounterModel {
private val _counter = MutableStateFlow(0) // 私有使用MutableStateFlow
val counter = _counter.asStateFlow() // 对外公开只读的StateFlow
fun inc() {
_counter.value++ //更改值
}
}
val aModel = CounterModel()
val bModel = CounterModel()
val sumFlow: Flow<Int> = aModel.counter.combine(bModel.counter) { a, b -> a + b }
// 两个计时器的结果相加
本文主要讲解了Kotlin Flow
的基础知识,下一篇我们将来介绍Kotlin Flow
在UI界面编写,网络数据请求和数据库中是如何使用的。