异步挂起函数能够返回单一值,那么我们如何返回多个异步计算的值呢?而这个就是Kotlin Flow需要解决地。
Representing multiple values
在kotlin,多个值可以由Collections表示。
fun foo(): List<Int> = listOf(1, 2, 3)
fun main() {
foo().forEach { value -> println(value) }
}
以上代码输出如下:
1
2
3
Sequences
如果我们使用CPU消费型阻塞代码生产numbers,我们可以使用Sequences表示这些numbers。
fun foo(): Sequence<Int> = sequence { // sequence builder
for (i in 1..3) {
Thread.sleep(100) // pretend we are computing it
yield(i) // yield next value
}
}
fun main() {
foo().forEach { value -> println(value) }
}
以上代码会每隔100ms,打印出一个数字。
Suspending functions
以上代码会阻塞主线程,我们可以给函数添加suspend修饰符来实现异步计算。
suspend fun foo(): List<Int> {
delay(1000) // pretend we are doing something asynchronous here
return listOf(1, 2, 3)
}
fun main() = runBlocking<Unit> {
foo().forEach { value -> println(value) }
}
使用List<Int>返回类型意味着,我们需要一次性返回所有值。为了表示异步计算的值的流,我们可以使用Flow<Int>,就像之前使用Sequence<Int>同步计算值一样。
fun foo(): Flow<Int> = flow { // flow builder
for (i in 1..3) {
delay(100) // pretend we are doing something useful here
emit(i) // emit next value
}
}
fun main() = runBlocking<Unit> {
// Launch a concurrent coroutine to check if the main thread is blocked
launch {
for (k in 1..3) {
println("I'm not blocked $k")
delay(100)
}
}
// Collect the flow
foo().collect { value -> println(value) }
}
输出如下:
I'm not blocked 1
1
I'm not blocked 2
2
I'm not blocked 3
3
Flow和之前例子的差别:
- Flow 类型 builder 函数称为 flow
- flow{}内部代码是可以挂起的
- 函数 foo()不再需要suspend修饰符
- 使用emit函数发射值
- 使用collect函数收集值
我们可以使用Thread.Sleep 来替换 delay,那么当前Main Thread就会被阻塞
Flows are cold
Flow是和sequence一样的code stream,在flow内的代码块只有到flow开始被collected时才开始运行;
fun foo(): Flow<Int> = flow {
println("Flow started")
for (i in 1..3) {
delay(100)
emit(i)
}
}
fun main() = runBlocking<Unit> {
println("Calling foo...")
val flow = foo()
println("Calling collect...")
flow.collect { value -> println(value) }
println("Calling collect again...")
flow.collect { value -> println(value) }
}
输出如下:
Calling foo...
Calling collect...
Flow started
1
2
3
Calling collect again...
Flow started
1
2
3
这个是返回flow的foo()函数不需要被标记为suspend的关键理由.foo()函数会马上返回不会有任何等待,flow每次都是在collect调用之后才开始执行,这就是为什么我们在调用collect之后才打印出来 "Flow started"。
Flow cancellation
Flow坚持使用通用的协作式协程取消方式。flow底层并没有采用附加的取消点。对于取消这是完全透明的。和往常一样,如果flow是在一个挂起函数内被挂起了,那么flow collection是可以被取消的,并且在其他情况下是不能被取消的。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun foo(): Flow<Int> = flow {
for (i in 1..3) {
delay(100)
println("Emitting $i")
emit(i)
}
}
fun main() = runBlocking<Unit> {
withTimeoutOrNull(250) { // Timeout after 250ms
foo().collect { value -> println(value) }
}
println("Done")
}
输出如下:
Emitting 1
1
Emitting 2
2
Done
Flow builders
在上述例子中,flow { }构造者是最简单的。以下还有更简单的实现:
- flowOf定义了一个flow用来emit一个固定的集合;
- 各种collections和sequences都可以通过asFlow()函数来转换为flows;
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking<Unit> {
// Convert an integer range to a flow
(1..3).asFlow().collect { value -> println(value) }
}
Intermediate flow operators
Flows可以通过操作符来进行转换,就如同你使用collections和sequences一样。中间操作符用来应用于上游flow任何产生下游flow。这些操作符都是冷启动的,就像flows一样。对于这些操作符的调用也都不是挂起函数。它工作很快,返回新的转换的flow的定义。
基础操作符也有着和map和filter类似的名字。和sequences一个很重要的差别是在这些操作符内部可以调用挂起函数。
举个例子,一个请求flow可以通过map操作符映射为result,即便这个请求是在一个挂起函数内需要长时间的运行。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
suspend fun performRequest(request: Int): String {
delay(1000) // imitate long-running asynchronous work
return "response $request"
}
fun main() = runBlocking<Unit> {
(1..3).asFlow() // a flow of requests
.map { request -> performRequest(request) }
.collect { response -> println(response) }
}
输出如下:
response 1
response 2
response 3
Transform operator
在flow转换操作符中,最经常使用的就是transform,它可以用来模拟简单转换,就和 map和filter一样,也可以用来实现更加复杂地转换。可以每次emit任意数量地任意值。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
suspend fun performRequest(request: Int): String {
delay(1000) // imitate long-running asynchronous work
return "response $request"
}
fun main() = runBlocking<Unit> {
(1..3).asFlow() // a flow of requests
.transform { request ->
emit("Making request $request")
emit(performRequest(request))
}
.collect { response -> println(response) }
}
Making request 1
response 1
Making request 2
response 2
Making request 3
response 3
Size-limiting operators
Size-limiting 中间操作符会比如take会取消flow继续执行,当设置地limit已经达到了设定值。协程取消总是会抛出一个异常,所以所有的资源管理函数对于取消操作都会添加比如try{}finally{}代码块。
import kotlinx.coroutines.flow.*
fun numbers(): Flow<Int> = flow {
try {
emit(1)
emit(2)
println("This line will not execute")
emit(3)
} finally {
println("Finally in numbers")
}
}
fun main() = runBlocking<Unit> {
numbers()
.take(2) // take only the first two
.collect { value -> println(value) }
}
1
2
Finally in numbers
Terminal flow operators
对于flows的末端操作符都是开始flow采集的挂起函数。collect操作符是最基础的,除此以外还有其他操作符:
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking<Unit> {
val sum = (1..5).asFlow()
.map { it * it } // squares of numbers from 1 to 5
.reduce { a, b -> a + b } // sum them (terminal operator)
println(sum)
}
55
Flows are sequential
每一个flow集合都是顺序执行,除非应用了某个特定地针对多个flow执行地操作符。每一个值都是经过中间操作符,从上游到达下游,最终到达末端操作符。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking<Unit> {
(1..5).asFlow()
.filter {
println("Filter $it")
it % 2 == 0
}
.map {
println("Map $it")
"string $it"
}.collect {
println("Collect $it")
}
}
Filter 1
Filter 2
Map 2
Collect string 2
Filter 3
Filter 4
Map 4
Collect string 4
Filter 5
Flow context
flow collection总是会在调用者协程发生。举个例子,如果有一个foo flow,那么以下代码会在作者指定地上下文中执行,而不用去看foo flow的具体执行细节。
withContext(context) {
foo.collect { value ->
println(value) // run in the specified context
}
}
因此,flow{}内的代码是运行在collector指定的上下文中。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
fun foo(): Flow<Int> = flow {
log("Started foo flow")
for (i in 1..3) {
emit(i)
}
}
fun main() = runBlocking<Unit> {
foo().collect { value -> log("Collected $value") }
}
[main @coroutine#1] Started foo flow
[main @coroutine#1] Collected 1
[main @coroutine#1] Collected 2
[main @coroutine#1] Collected 3
因为foo().collect是在主线程中被调用的。foo的flow内部代码也是在主线程中执行。这是个完美地实现,解决快速运行或者异步代码,不用关心执行环境和阻塞调用者线程。
Wrong emission withContext
然而,长时间运行CPU消费型任务需要在Dispatchers.Default中执行,UI更新代码需要在Dispatchers.Main中执行。通常,withContext用来切换kotlin 协程的上下文。但是flow{}内部的代码必须要尊重上下文保留属性,并且不允许从不同的上下文emit值。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun foo(): Flow<Int> = flow {
// The WRONG way to change context for CPU-consuming code in flow builder
kotlinx.coroutines.withContext(Dispatchers.Default) {
for (i in 1..3) {
Thread.sleep(100) // pretend we are computing it in CPU-consuming way
emit(i) // emit next value
}
}
}
fun main() = runBlocking<Unit> {
foo().collect { value -> println(value) }
}
Exception in thread "main" java.lang.IllegalStateException: Flow invariant is violated:
Flow was collected in [CoroutineId(1), "coroutine#1":BlockingCoroutine{Active}@5511c7f8, BlockingEventLoop@2eac3323],
but emission happened in [CoroutineId(1), "coroutine#1":DispatchedCoroutine{Active}@2dae0000, DefaultDispatcher].
Please refer to 'flow' documentation or use 'flowOn' instead
at ...
flowOn operator
以下展示正确切换flow上下文的方式:
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
fun foo(): Flow<Int> = flow {
for (i in 1..3) {
Thread.sleep(100) // pretend we are computing it in CPU-consuming way
log("Emitting $i")
emit(i) // emit next value
}
}.flowOn(Dispatchers.Default) // RIGHT way to change context for CPU-consuming code in flow builder
fun main() = runBlocking<Unit> {
foo().collect { value ->
log("Collected $value")
}
}
[DefaultDispatcher-worker-1 @coroutine#2] Emitting 1
[main @coroutine#1] Collected 1
[DefaultDispatcher-worker-1 @coroutine#2] Emitting 2
[main @coroutine#1] Collected 2
[DefaultDispatcher-worker-1 @coroutine#2] Emitting 3
[main @coroutine#1] Collected 3
flowOn改变了flow本身的顺序执行上下文。collection发生在协程"coroutine#1"中,emission发生在协程"coroutine#2"并且是运行咋另一个线程中,和collect操作是同时进行地。当必须要为上下文改变CoroutineDispatcher时,flowOn操作符就会为上游flow创建一个新协程。
Buffering
在不同协程中运行flow的不同部分,在整体立场上是非常有帮助的,特别是涉及到长时间运行的异步操作。举个例子,当foo()的flow的emission操作比较慢,比如没100ms生产一个一个element,并且collector也比较慢,花费300ms去处理一个元素。让我门看看一个处理三个数字的flow需要多少时间:
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.system.*
fun foo(): Flow<Int> = flow {
for (i in 1..3) {
delay(100) // pretend we are asynchronously waiting 100 ms
emit(i) // emit next value
}
}
fun main() = runBlocking<Unit> {
val time = measureTimeMillis {
foo().collect { value ->
delay(300) // pretend we are processing it for 300 ms
println(value)
}
}
println("Collected in $time ms")
}
输出如下,整个collection操作需要花费大概1200ms。
1
2
3
Collected in 1220 ms
我们可以使用 buffer操作符去并发执行foo()方法里的emit代码段,然后顺序执行collection操作。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.system.*
fun foo(): Flow<Int> = flow {
for (i in 1..3) {
delay(100) // pretend we are asynchronously waiting 100 ms
emit(i) // emit next value
}
}
fun main() = runBlocking<Unit> {
val time = measureTimeMillis {
foo()
.buffer() // buffer emissions, don't wait
.collect { value ->
delay(300) // pretend we are processing it for 300 ms
println(value)
}
}
println("Collected in $time ms")
}
上述方式会更快生产numbers,因为我们有效创建了处理流程,只需要在第一个数字上等待100ms,然后每个数字都花费300ms去做处理。这样方式会花费大概1000ms。
1
2
3
Collected in 1071 ms
注意: flowOn操作符使用了相同的缓存机制,但是它必须切换CoroutineDispatcher,但是在这里,我们显示请求了buffer而不是切换执行上下文。
Conflation
当一个flow表示操作的部分结果或者操作状态更新,它可能并不需要去处理每一个值,但是需要处理最近的一个值。在这种场景下, conflate操作符可以被用于忽略中间操作符,当collector处理太慢。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.system.*
fun foo(): Flow<Int> = flow {
for (i in 1..3) {
delay(100) // pretend we are asynchronously waiting 100 ms
emit(i) // emit next value
}
}
fun main() = runBlocking<Unit> {
val time = measureTimeMillis {
foo()
.conflate() // conflate emissions, don't process each one
.collect { value ->
delay(300) // pretend we are processing it for 300 ms
println(value)
}
}
println("Collected in $time ms")
}
从上面可以看出来,第一个数字会被处理,第二第三个也会被处理,而第二个数字会被合并,只有第三个数字发送给collector进行处理。
1
3
Collected in 758 ms
Processing the latest value
Conflation是一种对emitter和collector慢处理的一种加速方式。它通过丢弃一些值来做实现。另外一种方式就是通过取消一个慢处理collector然后重启collector接受已经发射的新值。有一族的xxxlatest操作符来执行和xxx操作符同样的基本逻辑。取消emit新值的代码块内的代码。我们把上个例子中的conflate改成collectlatest。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.system.*
fun foo(): Flow<Int> = flow {
for (i in 1..3) {
delay(100) // pretend we are asynchronously waiting 100 ms
emit(i) // emit next value
}
}
fun main() = runBlocking<Unit> {
val time = measureTimeMillis {
foo()
.collectLatest { value -> // cancel & restart on the latest value
println("Collecting $value")
delay(300) // pretend we are processing it for 300 ms
println("Done $value")
}
}
println("Collected in $time ms")
}
因为collectLatest花费来300ms,每次发送一个新值都是100ms。我们看见代码块都是运行在新值上,但是只会以最新值完成。
Collecting 1
Collecting 2
Collecting 3
Done 3
Collected in 741 ms
Composing multiple flows
有不少方式来组合多个flow。
就像kotlin标准库内的Sequence.zip扩展函数一样,flows有zip操作符来组合不同的flows的值。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking<Unit> {
val nums = (1..3).asFlow() // numbers 1..3
val strs = flowOf("one", "two", "three") // strings
nums.zip(strs) { a, b -> "$a -> $b" } // compose a single string
.collect { println(it) } // collect and print
}
1 -> one
2 -> two
3 -> three
Combine
当flow表示操作或者变量的最近值,它可能需要去执行计算根据所依赖的相应的flow的最近的值,并且会重新计算,当上游flow发射新值的时候。相应的操作符族被称作combine。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking<Unit> {
val nums = (1..3).asFlow().onEach { delay(300) } // numbers 1..3 every 300 ms
val strs = flowOf("one", "two", "three").onEach { delay(400) } // strings every 400 ms
val startTime = System.currentTimeMillis() // remember the start time
nums.zip(strs) { a, b -> "$a -> $b" } // compose a single string with "zip"
.collect { value -> // collect and print
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
}
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking<Unit> {
val nums = (1..3).asFlow().onEach { delay(300) } // numbers 1..3 every 300 ms
val strs = flowOf("one", "two", "three").onEach { delay(400) } // strings every 400 ms
val startTime = System.currentTimeMillis() // remember the start time
nums.combine(strs) { a, b -> "$a -> $b" } // compose a single string with "combine"
.collect { value -> // collect and print
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
}
1 -> one at 452 ms from start
2 -> one at 651 ms from start
2 -> two at 854 ms from start
3 -> two at 952 ms from start
3 -> three at 1256 ms from start
Flattening flows
Flows表示异步值接收序列,它会很容易地触发请求另一个sequence值。比如,以下例子返回了两个500ms间隔字符串地flow。
fun requestFlow(i: Int): Flow<String> = flow {
emit("$i: First")
delay(500) // wait 500 ms
emit("$i: Second")
}
现在我们有三个整型的flow,我们调用requestFlow来请求值。
(1..3).asFlow().map { requestFlow(it) }
现在我们结束flows的每一个flow,需要将其扁平化为一个flow来进行进一步地处理。Collections和sequences有 flatten和flatMap操作符,由于flow地异步性质,它会调用不同地flattening地模型,因此,flows有一族的flattening操作符。
flatMapConcat
连接模式由flatMapConcat和flattenConcat操作符实现。 它们和sequence是最相似的。在收集新值之前,它们会等待内部flow完成,如同下面的例子描述地一样:
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun requestFlow(i: Int): Flow<String> = flow {
emit("$i: First")
delay(500) // wait 500 ms
emit("$i: Second")
}
fun main() = runBlocking<Unit> {
val startTime = System.currentTimeMillis() // remember the start time
(1..3).asFlow().onEach { delay(100) } // a number every 100 ms
.flatMapConcat { requestFlow(it) }
.collect { value -> // collect and print
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
}
可以看出来 flatMapConcat的顺序性质
1: First at 121 ms from start
1: Second at 622 ms from start
2: First at 727 ms from start
2: Second at 1227 ms from start
3: First at 1328 ms from start
3: Second at 1829 ms from start
flatMapMerge
另一个flattening mode是并发收集flows并且将合并它们的值为一个单一flow,因此发射地值会尽快被处理。这些由flatMapMerge和flattenMerge来实现。它们都有一个可选的concurrency参数来限制并发同时进行收集的flow个数。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun requestFlow(i: Int): Flow<String> = flow {
emit("$i: First")
delay(500) // wait 500 ms
emit("$i: Second")
}
fun main() = runBlocking<Unit> {
val startTime = System.currentTimeMillis() // remember the start time
(1..3).asFlow().onEach { delay(100) } // a number every 100 ms
.flatMapMerge { requestFlow(it) }
.collect { value -> // collect and print
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
}
可以看出来flatMapMerge并发特性:
1: First at 136 ms from start
2: First at 231 ms from start
3: First at 333 ms from start
1: Second at 639 ms from start
2: Second at 732 ms from start
3: Second at 833 ms from start
可以看出来flatMapMerge是顺序调用内部代码块(在这个例子中是{ requestFlow(it) } )但是并发收集结果flows,它等价于顺序执行map { requestFlow(it) },然后对于结果调用flattenMerge。
flatMapLatest
flatMapLatest和collectLatest操作符很像,在"Processing the latest value"章节有介绍,里面有关于"Latest"模式的介绍,只有新flow发射了了新值,那么上个flow就会被取消,由 flatMapLatest实现。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun requestFlow(i: Int): Flow<String> = flow {
emit("$i: First")
delay(500) // wait 500 ms
emit("$i: Second")
}
fun main() = runBlocking<Unit> {
val startTime = System.currentTimeMillis() // remember the start time
(1..3).asFlow().onEach { delay(100) } // a number every 100 ms
.flatMapLatest { requestFlow(it) }
.collect { value -> // collect and print
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
}
1: First at 142 ms from start
2: First at 322 ms from start
3: First at 425 ms from start
3: Second at 931 ms from start
flatMapLatest取消了代码快内所有代码(在这个例子中是{ requestFlow(it) })当flow发射新值。在这个特定例子中没有差别,因为对requestFlow的调用是非常快的,非挂起也不能取消。如果我们使用挂起函数,比如delay就会按照期望的来显示。
Flow exceptions
如果在emitter内部或者操作符内部抛出一个异常,flow collection也是可以正常完成。也有好几种处理异常的方式。
Collector try and catch
在collector内部使用try/catch
代码块来处理异常。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun foo(): Flow<Int> = flow {
for (i in 1..3) {
println("Emitting $i")
emit(i) // emit next value
}
}
fun main() = runBlocking<Unit> {
try {
foo().collect { value ->
println(value)
check(value <= 1) { "Collected $value" }
}
} catch (e: Throwable) {
println("Caught $e")
}
}
这段代码成功捕捉了 collect 末端操作符内的异常,在抛出异常之后就没有再发送新值。
Emitting 1
1
Emitting 2
2
Caught java.lang.IllegalStateException: Collected 2
Everything is caught
上个例子确实捕捉了emitter,中间操作符,末端操作符内的异常。我们修改以下代码,将emitter发射值通过mapped修改为strings,但是除了异常之外。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun foo(): Flow<String> =
flow {
for (i in 1..3) {
println("Emitting $i")
emit(i) // emit next value
}
}
.map { value ->
check(value <= 1) { "Crashed on $value" }
"string $value"
}
fun main() = runBlocking<Unit> {
try {
foo().collect { value -> println(value) }
} catch (e: Throwable) {
println("Caught $e")
}
}
Emitting 1
string 1
Emitting 2
Caught java.lang.IllegalStateException: Crashed on 2
Exception transparency
但是怎么样才能将封装处理emitter代码异常处理逻辑呢?
Flow必须对异常处理透明,在flow{}内使用try/catch违背了异常处理透明化原则。在上述例子中,保证了从collector抛出异常也能在try/catch内捕获。
emitter可以使用catch操作符来实现异常透明化处理,运行异常处理封装。catch操作符可以分析异常,根据不同的异常作出相应处理。
- 可以使用throw再次抛出异常
- 异常可以在catch内转换为发射值
- 异常可以被忽略、打印、或者由其他逻辑代码进行处理
举个例子,我们在catch异常之后再发射一段文本:
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun foo(): Flow<String> =
flow {
for (i in 1..3) {
println("Emitting $i")
emit(i) // emit next value
}
}
.map { value ->
check(value <= 1) { "Crashed on $value" }
"string $value"
}
fun main() = runBlocking<Unit> {
foo()
.catch { e -> emit("Caught $e") } // emit on exception
.collect { value -> println(value) }
}
Transparent catch
catch中间操作符,履行了异常透明化原则,只是捕捉上游异常(仅仅会捕获catch操作符以上的异常,而不会捕获catch以下的异常),如果是在collect{}代码块内的异常则会逃逸。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun foo(): Flow<Int> = flow {
for (i in 1..3) {
println("Emitting $i")
emit(i)
}
}
fun main() = runBlocking<Unit> {
foo()
.catch { e -> println("Caught $e") } // does not catch downstream exceptions
.collect { value ->
check(value <= 1) { "Collected $value" }
println(value)
}
}
Catching declaratively
我们可以结合 catch操作符属性然后如果要求去处理所有异常的话就可以把 collect函数体内的逻辑转移到onEach并且放到catch操作符前面去。这样的Flow Collection 必须由对collect的调用触发并且collect方法没有参数。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun foo(): Flow<Int> = flow {
for (i in 1..3) {
println("Emitting $i")
emit(i)
}
}
fun main() = runBlocking<Unit> {
foo()
.onEach { value ->
check(value <= 1) { "Collected $value" }
println(value)
}
.catch { e -> println("Caught $e") }
.collect()
}
现在我们可以看见"Caught …"消息打印出来,现在就可以捕获所有异常而不用显示编写try/catch代码块。
Flow completion
在flow collection完成之后(正常完成或者异常完成)可能需要执行一个操作,可以通过两种方式来完成: imperative 或 declarative。
Imperative finally block
对于try/catch,collector也可以通过使用finally来执行完成操作。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun foo(): Flow<Int> = (1..3).asFlow()
fun main() = runBlocking<Unit> {
try {
foo().collect { value -> println(value) }
} finally {
println("Done")
}
}
1
2
3
Done
Declarative handling
对于declarative方式,flow有一个onCompletion中间操作符,在flow完成collect操作后就会被调用。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun foo(): Flow<Int> = (1..3).asFlow()
fun main() = runBlocking<Unit> {
foo()
.onCompletion { println("Done") }
.collect { value -> println(value) }
}
对于 onCompletion的关键性优点则是可以为空的Throwable参数,可以以此判断flow是正常完成还是异常完成,在以下例子中,foo() flow在emit数字1之后抛出来异常。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun foo(): Flow<Int> = flow {
emit(1)
throw RuntimeException()
}
fun main() = runBlocking<Unit> {
foo()
.onCompletion { cause -> if (cause != null) println("Flow completed exceptionally") }
.catch { cause -> println("Caught exception") }
.collect { value -> println(value) }
}
输入如下
1
Flow completed exceptionally
Caught exception
onCompletion并不像catch一样,它不会处理异常。正如我们上面看见的一样,异常依然向下游流动,被进一步分发到onCompletion操作符,也可以在catch操作符内做处理。
Upstream exceptions only
和catch操作符一样,onCompletion只会看见来自上游的异常,不能看见下游异常。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun foo(): Flow<Int> = (1..3).asFlow()
fun main() = runBlocking<Unit> {
foo()
.onCompletion { cause -> println("Flow completed with $cause") }
.collect { value ->
check(value <= 1) { "Collected $value" }
println(value)
}
}
1
Flow completed with null
Exception in thread "main" java.lang.IllegalStateException: Collected 2
Imperative versus declarative
现在我们知道如何去收集flow,处理completion和exceptions以imperative和declarative方式。
Launching flow
在一些数据源,使用flows来表示异步事件是非常简单地。我们需要和addEventListener一样的类似处理方式,注册一段代码用来处理新事件和进行进一步的工作。onEach正好能服务这一角色。然而,onEach是一个中间操作符。我们也需要一个末端操作符来收集操作符。其他情况下调用onEach则毫无影响。
如果我们在 collect末端操作符之前使用onEach,在那之后的代码将会等待直到flow收集完成。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
// Imitate a flow of events
fun events(): Flow<Int> = (1..3).asFlow().onEach { delay(100) }
fun main() = runBlocking<Unit> {
events()
.onEach { event -> println("Event: $event") }
.collect() // <--- Collecting the flow waits
println("Done")
}
Event: 1
Event: 2
Event: 3
Done
launchIn操作符是很方便地。使用collect来替换launchIn,我们就可以在一个独立协程中执行flow采集,所以后面的代码就会马上执行。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
// Imitate a flow of events
fun events(): Flow<Int> = (1..3).asFlow().onEach { delay(100) }
fun main() = runBlocking<Unit> {
events()
.onEach { event -> println("Event: $event") }
.launchIn(this) // <--- Launching the flow in a separate coroutine
println("Done")
}
Done
Event: 1
Event: 2
Event: 3
launchIn参数可以指定一个 CoroutineScope,用来表示flow收集的协程所在的作用域。在上述例子中,这个作用域来自于 runBlocking协程构造器,runBlocking作用域会等待内部子作用域完成,在这例子中保持主函数的返回和介绍。
在实际应用程序中,一个作用域会来自一个有限生命周期的实体。只要这个实体的生命周期终结了,那么相应的作用域也会被取消,取消相应flow的采集工作。在这种方式下,使用onEach { ... }.launchIn(scope)和addEventListener就很类似,然而,没有必要直接调用removeEventListener,因为取消操作和结构化并发会自动完成这个操作。
请注意,launchIn会返回 Job,在没有取消整个作用域或join时,job可以用来cancel相应的flow收集协程。