JetPack知识点实战系列十二:Kotlin Flow基础知识详解

阅读过实战系列三的文章的同学可能有印象,我有提到过Kotlin FlowRoom数据库能很好的配合使用。此外,不久前官方推出了DataStore也是基于Kotlin Flow,所以本节我们先来了解下Kotlin Flow的使用方法。

Flow 引入的场景

Kotlin的官方解释引入Flow的场景是:

Return multiple asynchronously computed values over time

这句话很重要,它包括几个关键含义:

  1. 返回值有多个
  2. 返回值是分开的
  3. 返回的是异步结果

接下来我们看看单个含义在Kotlin中如何实现:

  • 返回多个值 - Collections

返回多个值我们能很容易的想到使用Collections。例如一个
返回ListlistExample的函数。

fun listExample(): List<Int> {
    return listOf(1, 2, 3)
}

listExample().forEach { value -> println(value) }
// 1, 2, 3
  • 值是分开返回的

上面的情况多个值是作为一个整体一起返回的,如果让多个值分开返回就得使用Kotlin的另外一个数据类型Sequences

fun sequenceExample(): Sequence<Int> {
    return sequence {
        Thread.sleep(1000)  // 假装在执行耗时操作
        yield(1)            // 返回值
        Thread.sleep(1000)
        yield(2)
        Thread.sleep(1000)
        yield(3)
    }
}

sequenceExample().forEach { value ->
    println(value)
    println(Date().time)
}

// 结果:
I/System.out: 1
I/System.out: 1605153441181
I/System.out: 2
I/System.out: 1605153442182
I/System.out: 3
I/System.out: 1605153443185

结果可以看出,通过使用sequence可以实现多个结果分批次单个数值的返回。

  • 异步返回

异步返回可以使用susupend函数

suspend fun simpleAsyn(): List<Int> {
    delay(1000)     // 假装在执行异步耗时操作
    return listOf(1, 2, 3)
}
  • Flow实现Return multiple asynchronously computed values over time
fun simpleFlow(): Flow<Int> {
    return flow {
        delay(1000) // 假装在执行异步耗时操作
        emit(1)     // 返回值
        delay(1000)
        emit(2)
        delay(1000)
        emit(3)
    }
}
    
simpleFlow().collect { value -> println(value) }    
    
// 1, 2, 3 

结论:Flow可以简单的理解为异步的Sequence

Flow 的特性

Flow可以理解成异步的Sequence,也具有和Sequence类似的一些特性。

  • 延迟执行

我们的代码如下,打印结果出来可能会比较费解。

val streamOfInts: Flow<Int> = flow {
    println("开始发送===>")
    for (i in 1..3) {
        delay(100)
        println("发送初始值===>$i")
        emit(i)
        }
        println("结束发送===>")
    }
    
streamOfInts.map { value -> value * value }

println("Flow完成===>")

// 打印结果:
println("Flow完成===>")

为什么呢? 因为Flow是惰性的,streamOfInts这个Flow没有执行结束操作函数Flow中的各种中间操作函数不会立即执行

结束操作函数 --- Flow执行函数后的返回结果不是Flow,那这个函数就是结束操作函数

中间操作函数 --- Flow执行函数后的返回结果仍然是Flow

我们接下来修改代码,执行collect这个函数,它的意义是接收Flow传过来的值:

val streamOfInts: Flow<Int> = flow {
    println("开始发送===>")
    for (i in 1..3) {
        delay(100)
        println("发送初始值===>$i")
        emit(i)
        }
        println("结束发送===>")
    }
    
streamOfInts.map { value -> value * value }.collect { println("最终收到 ===> $it") }

// 打印结果:
I/System.out: 开始发送===>
I/System.out: 发送初始值===>1
I/System.out: 最终收到 ===> 1
I/System.out: 发送初始值===>2
I/System.out: 最终收到 ===> 4
I/System.out: 发送初始值===>3
I/System.out: 最终收到 ===> 9
I/System.out: 结束发送===>
I/System.out: Flow完成===>

熟悉RxJava的同学是不是似曾相识的感觉,没错Flow就是冷信号,只有被订阅后才会发送值。

  • 顺序执行

上面的结果我们能看出来,分别对每个值进行了所有的操作后才进行下一个值的处理,即先处理完1,再处理2,再处理3。

1 - 1 - 2 - 4 - 3 - 9

Flow的构造函数

  1. 通过flowOf函数
flowOf(1, 2, 3)
// 1, 2, 3
flowOf(listOf(1,2,3))
// [1, 2, 3]
  1. Iterable调用asFlow函数
listOf(1, 2, 3).asFlow()
// 1, 2, 3
  1. 无参但是有返回值的函数(() -> T)调用asFlow函数
fun flowBuilderFunction(): Int {
    return 10
}

::flowBuilderFunction.asFlow()

// 10
  1. 无参但是有返回值的挂起函数(() -> T)调用asFlow函数
suspend fun flowBuilderFunction(): Int {
    return 10
}

::flowBuilderFunction.asFlow()

// 10
  1. Array调用asFlow函数
arrayOf(1,2,3).asFlow()
// 1, 2, 3
  1. LongRange调用asFlow函数
LongRange(1, 5).asFlow().collect { value -> println(value) }

//  1, 2, 3, 4, 5
  1. Empty Flow
emptyFlow<String>()

Flow中间运算函数

Delay相关的运算函数

debounce
  1. 如果两个相邻的值生产出来的时间间隔超过了[timeout]毫秒,就忽过滤掉前一个值
  2. 最后一个值不受影响,总是会被释放emit
flow {
    emit(1)
    delay(3000)
    emit(2)
    delay(1000)
    emit(3)
    delay(1000)
    emit(4)
}.debounce(2000)

// 结果:1 4 
// 解释:
//  2和1的间隔大于2000,1被释放
//  3和2的间隔小于2000, 2被忽略
//  4和3的间隔小于2000, 3被忽略
//  4是最后一个值不受timeout值的影响, 4被释放
  1. [timeout]可以传毫秒,也可以传Duration

注意:还是个实验性的方法,使用的时候需要加上@ExperimentalTime

flow {
    emit(1)
    delay(3000)
    emit(2)
    delay(1000)
    emit(3)
    delay(1000)
    emit(4)
}.debounce(2000.milliseconds)

// 结果:1 4 

Distinct相关的运算函数

distinctUntilChanged
  1. 如果生产的值和上个发送的值相同,值就会被过滤掉
flow {
    emit(1)
    emit(1)
    emit(2)
    emit(2)
    emit(3)
    emit(4)
}.distinctUntilChanged()

// 结果:1 2 3 4
// 解释:
// 第一个1被释放
// 第二个1由于和第一个1相同,被过滤掉
// 第一个2被释放
// 第二个2由于和第一个2相同,被过滤掉
// 第一个3被释放
// 第一个4被释放
  1. 可以传参(old: T, new: T) -> Boolean,进行自定义的比较
private class Person(val age: Int, val name: String)

flow {
    emit(Person(20, "张三"))
    emit(Person(21, "李四"))
    emit(Person(21, "王五"))
    emit(Person(22, "赵六"))
}.distinctUntilChanged{old, new -> old.age == new.age }
.collect{ value -> println(value.name) }
    
// 结果:张三 李四 赵六
// 解释:本例子定义如果年龄相同就认为是相同的值,所以王五被过滤掉了
  1. 可以用distinctUntilChangedBy转换成年龄进行对比
flow {
    emit(Person(20, "张三"))
    emit(Person(21, "李四"))
    emit(Person(21, "王五"))
    emit(Person(22, "赵六"))
}.distinctUntilChangedBy { person -> person.age }

// 结果:张三 李四 赵六

Emitters相关的运算函数

transform

对每个值进行转换

flow {
    emit(1)
    emit(2)
    emit(3)
    emit(4)
}.transform {
    if (it % 2 == 0) {
        emit(it * it)
    }
}

// 结果:4 16
// 解释:
// 1 不是偶数,被忽略
// 2 是偶数,2的平方4
// 3 不是偶数,被忽略
// 4 是偶数,4的平方16
onStart

第一个值被释放之前被执行

flow {
    emit(1)
    emit(2)
    emit(3)
    emit(4)
}.onStart { emit(1000) }

// 结果:1000 1 2 3 4
// 解释:
// 第一个值1被释放的时候调用了emit(1000), 所以1000在1之前被释放
onCompletion

最后一个值释放完成之后被执行

flow {
    emit(1)
    emit(2)
    emit(3)
    emit(4)
}.onCompletion { emit(1000) }

// 结果:1 2 3 4 1000
// 解释:
// 第一个值4被释放的时候调用了emit(1000), 所以1000在4之后被释放

Limit相关的运算函数

drop

忽略最开始的[count]个值

flow {
    emit(1)
    emit(2)
    emit(3)
    emit(4)
}.drop(2)

// 结果:3 4
// 解释:
// 最开始释放的两个值(1,2)被忽略了
dropWhile

判断第一个值如果满足(T) -> Boolean这个条件就忽略

flow {
    emit(1)
    emit(2)
    emit(3)
    emit(4)
}.dropWhile {
    it % 2 == 0
}

// 结果:1 2 3 4
// 解释:
// 第一个值不是偶数,所以1被释放

flow {
    emit(1)
    emit(2)
    emit(3)
    emit(4)
}.dropWhile {
    it % 2 != 0
}

// 结果:2 3 4
// 解释:
// 第一个值是偶数,所以1被忽略
take

只释放前面[count]个值

flow {
    emit(1)
    emit(2)
    emit(3)
    emit(4)
}.take(2)

// 结果:1 2
// 解释:
// 前面两个值被释放
takeWhile

判断第一个值如果满足(T) -> Boolean这个条件就释放

flow {
    emit(1)
    emit(2)
    emit(3)
    emit(4)
}.takeWhile { it%2 != 0 }

// 结果:1
// 解释:
// 第一个值满足是奇数条件

flow {
    emit(1)
    emit(2)
    emit(3)
    emit(4)
}.takeWhile { it%2 == 0 }

// 结果:无
// 解释:
// 第一个值不满足是奇数条件

CoroutineContext相关的运算函数

flowOn

可以切换CoroutineContext

说明:flowOn只影响该运算符之前的CoroutineContext,对它之后的CoroutineContext没有任何影响

withContext(Dispatchers.Main) {
    flow {
        println("Thread name1 = ${Thread.currentThread().name}")
        emit(1)
        emit(2)
        emit(3)
        emit(4)
    }
        .map {
            println("Thread name2 = ${Thread.currentThread().name}")
            it * it
        }
        .filter {
            println("Thread name3 = ${Thread.currentThread().name}")
            it > 9
        }
        .flowOn(Dispatchers.IO)
        .collect{value ->
            println("Thread name4 = ${Thread.currentThread().name}")
            println(value)
        }
}


// 结果:
// Thread name1 = DefaultDispatcher-worker-1
// Thread name2 = DefaultDispatcher-worker-1
// Thread name3 = DefaultDispatcher-worker-1
// Thread name2 = DefaultDispatcher-worker-1
// Thread name3 = DefaultDispatcher-worker-1
// Thread name2 = DefaultDispatcher-worker-1
// Thread name3 = DefaultDispatcher-worker-1
// Thread name2 = DefaultDispatcher-worker-1
// Thread name3 = DefaultDispatcher-worker-1
// Thread name4 = main
// 16
// 解释:
// flowOn(Dispatchers.IO)之前的flow,map,filter都是在Dispatchers.IO上执行
// flowOn(Dispatchers.IO)之后的collect则由withContext(Dispatchers.Main)决定

buffer

将flow的多个任务分配到不同的协程中去执行,加快执行的速度。

val flow1 = flow {
    delay(2000)  // 假设耗时任务
    emit(1)      // 释放值
    delay(2000) 
    emit(2)
    delay(2000)
    emit(3)
    delay(2000)
    emit(4)
}

flow1.collect { value ->
    println(value)
    delay(4000)
}

// 结果
// 2020-11-16 13:48:37.144 24060-24116/com.johnny.flowdemo I/System.out: 1
// 2020-11-16 13:48:43.150 24060-24116/com.johnny.flowdemo I/System.out: 2
// 2020-11-16 13:48:49.160 24060-24116/com.johnny.flowdemo I/System.out: 3
// 2020-11-16 13:48:55.166 24060-24116/com.johnny.flowdemo I/System.out: 4
// 解释:
// 4个耗时操作每个2000毫秒,加上collect的4000毫秒,所以每个值的时间间隔是6000毫秒。

val flow2 = flow {
    delay(2000)  // 假设耗时任务
    emit(1)      // 释放值
    delay(2000) 
    emit(2)
    delay(2000)
    emit(3)
    delay(2000)
    emit(4)
}.buffer()

flow2.collect { value ->
    println(value)
    delay(4000)
}

// 结果
// 2020-11-16 13:51:11.290 24253-24299/com.johnny.flowdemo I/System.out: 1
// 2020-11-16 13:51:15.293 24253-24299/com.johnny.flowdemo I/System.out: 2
// 2020-11-16 13:51:19.297 24253-24300/com.johnny.flowdemo I/System.out: 3
// 2020-11-16 13:51:23.301 24253-24300/com.johnny.flowdemo I/System.out: 4

// 解释:
// 4个耗时操作被分配到了不同的协程中执行,总共耗时了大约2000毫秒。collect收到的4个值差不多同时,所以每个值依次收到的时间间隔是4000毫秒。
conflate

如果值的生产速度大于值的消耗速度,就忽略掉中间未来得及处理的值,只处理最新的值。

val flow1 = flow {
    delay(2000)
    emit(1)
    delay(2000)
    emit(2)
    delay(2000)
    emit(3)
    delay(2000)
    emit(4)
}.conflate()

flow1.collect { value ->
    println(value)
    delay(5000)
}

// 结果: 1 3 4
// 解释:
// 2000毫秒后生产了1这个值,交由collect执行,花费了5000毫秒,当1这个值执行collect完成后已经经过了7000毫秒。
// 这7000毫秒中,生产了2,但是collect还没执行完成又生产了3,所以7000毫秒以后会直接执行3的collect方法,忽略了2这个值
// collect执行完3后,还有一个4,继续执行。

Flatten相关的运算函数

flatMapConcat

将原始的Flow<T>通过[transform]转换成Flow<Flow<T>>,然后将Flow<Flow<T>>释放的Flow<T>其中释放的值一个个释放。

flow {
    delay(1000)
    emit(1)
    delay(1000)
    emit(2)
    delay(1000)
    emit(3)
    delay(1000)
    emit(4)
}.flatMapConcat {
    flow {
        emit("$it 产生第一个flow值")
        delay(2500)
        emit("$it 产生第二个flow值")
    }
}.collect { value ->
    println(value)
}

// 结果
// I/System.out: 1 产生第一个flow值
// I/System.out: 1 产生第二个flow值
// I/System.out: 2 产生第一个flow值
// I/System.out: 2 产生第二个flow值
// I/System.out: 3 产生第一个flow值
// I/System.out: 3 产生第二个flow值
// I/System.out: 4 产生第一个flow值
// I/System.out: 4 产生第二个flow值

// 解释:
// 原始Flow<Int>通过flatMapConcat被转换成Flow<Flow<Int>>
// 原始Flow<Int>首先释放1,接着Flow<Flow<Int>> 就会释放 1产生第一个flow值 和 1产生第二个flow值 两个值
// Flow<Int>释放2,...
// Flow<Int>释放3,...
// Flow<Int>释放4,...

flattenConcat

flatMapConcat类似,只是少了一步Map操作。

flow {
    delay(1000)
    emit(flow {
        emit("1 产生第一个flow值")
        delay(2000)
        emit("1 产生第二个flow值") })
    delay(1000)
    emit(flow {
        emit("2 产生第一个flow值")
        delay(2000)
        emit("3 产生第二个flow值") })
    delay(1000)
    emit(flow {
        emit("3 产生第一个flow值")
        delay(2000)
        emit("3 产生第二个flow值") })
    delay(1000)
    emit(flow {
        emit("4 产生第一个flow值")
        delay(2500)
        emit("4 产生第二个flow值") })
    }.flattenConcat()
    
// 结果
// I/System.out: 1 产生第一个flow值
// I/System.out: 1 产生第二个flow值
// I/System.out: 2 产生第一个flow值
// I/System.out: 2 产生第二个flow值
// I/System.out: 3 产生第一个flow值
// I/System.out: 3 产生第二个flow值
// I/System.out: 4 产生第一个flow值
// I/System.out: 4 产生第二个flow值

flatMapMerge

将原始的Flow<T>通过[transform]转换成Flow<Flow<T>>,然后将Flow<Flow<T>>释放的Flow<T>其中释放的值一个个释放。

它与flatMapConcat的区别是:Flow<Flow<T>>释放的Flow<T>其中释放的值没有顺序性,谁先产生谁先释放。

flow {
    delay(1000)
    emit(1)
    delay(1000)
    emit(2)
    delay(1000)
    emit(3)
    delay(1000)
    emit(4)
}.flatMapMerge {
    flow {
        emit("$it 产生第一个flow值")
        delay(2500)
        emit("$it 产生第二个flow值")
    }
}.collect { value ->
    println(value)
}

// 结果
// I/System.out: 1 产生第一个flow值
// I/System.out: 2 产生第一个flow值
// I/System.out: 3 产生第一个flow值
// I/System.out: 1 产生第二个flow值
// I/System.out: 4 产生第一个flow值
// I/System.out: 2 产生第二个flow值
// I/System.out: 3 产生第二个flow值
// I/System.out: 4 产生第二个flow值

// 解释:
// 原始Flow<Int>首先释放1, 第二个Flow<Flow<Int>> 释放 1产生第一个flow值,但是 1产生第二个flow值是3500毫秒才释放,2 产生第一个flow值 是2000毫秒释放, 3 产生第一个flow值 是3000毫秒释放,3500毫秒时刻才是 1产生第二个flow值 的释放
flatMapMerge

flattenMerge类似,只是少了一步Map操作。

merge

Iterable<Flow<T>>合并成一个Flow<T>

val flow1 = listOf(
    flow {
        emit(1)
        delay(500)
        emit(2)
    },
    flow {
        emit(3)
        delay(500)
        emit(4)
    },
    flow {
        emit(5)
        delay(500)
        emit(6)
    }
)
flow1.merge().collect { value -> println("$value") }

// 结果: 1 3 5 2 4 6
// 解释:
// 按Iterable的顺序和耗时顺序依次释放值

transformLatest

原始flow会触发transformLatest转换后的flow, 当原始flow有新的值释放后,transformLatest转换后的flow会被取消,接着触发新的转换后的flow

flow {
    emit(1)
    delay(1000)
    emit(2)
    delay(2000)
    emit(3)
    delay(3000)
    emit(4)
}.transformLatest { value ->
    delay(2500)
    emit(value * value )
}

// 结果: 9 16
// 解释:
// 原始Flow释放1以后,转换后的Flow还没来得及释放1,原始Flow释放2
// 原始Flow释放2以后,转换后的Flow还没来得及释放4,原始Flow释放3
// 原始Flow释放3以后,转换后的Flow有足够的时间释放9
// 原始Flow释放4以后,转换后的Flow有足够的时间释放16
flatMapLatest

transformLatest类似, 原始flow会触发transformLatest转换后的flow, 当原始flow有新的值释放后,transformLatest转换后的flow会被取消,接着触发新的转换后的flow

区别:flatMapLatesttransform转换成的是Flow<T>, transformLatesttransform转换成的是Unit

flow {
    emit(1)
    delay(1000)
    emit(2)
    delay(2000)
    emit(3)
    delay(3000)
    emit(4)
}.flatMapLatest { value ->
    flow {
        delay(2500)
        emit(value * value )
    }
}

// 结果: 9 16
// 解释:
// 原始Flow释放1以后,转换后的Flow还没来得及释放1,原始Flow释放2
// 原始Flow释放2以后,转换后的Flow还没来得及释放4,原始Flow释放3
// 原始Flow释放3以后,转换后的Flow有足够的时间释放9
// 原始Flow释放4以后,转换后的Flow有足够的时间释放16
mapLatest

transformLatest类似, 原始flow会触发transformLatest转换后的flow, 当原始flow有新的值释放后,transformLatest转换后的flow会被取消,接着触发新的转换后的flow

区别:mapLatesttransform转换成的是T,flatMapLatesttransform转换成的是Flow<T>,transformLatesttransform转换成的是Unit

Transform相关的运算函数

filter

通过predicate进行过滤,满足条件则被释放

flow {
    emit(1)
    emit(2)
    emit(3)
    emit(4)
}.filter { it % 2 == 0 }

// 结果: 2 4
// 解释:
// 2和4满足it % 2 == 0,被释放
filterNot

通过predicate进行过滤,不满足条件则被释放

flow {
    emit(1)
    emit(2)
    emit(3)
    emit(4)
}.filterNot { it % 2 == 0 }

// 结果: 1 3
// 解释:
// 1和3不满足it % 2 == 0,被释放
filterIsInstance

如果是某个数据类型则被释放

flow {
    emit(1)
    emit("2")
    emit("3")
    emit(4)
}.filterIsInstance<String>()

// 结果: "2" "3"
// 解释:
// "2" "3"是String类型,被释放
filterNotNull

如果数据是非空,则被释放

flow {
    emit(1)
    emit("2")
    emit("3")
    emit(null)
}.filterNotNull()

// 结果: 1 "2" "3"
map

将一个值转换成另外一个值

flow {
    emit(1)
    emit(2)
    emit(3)
    emit(4)
}.map { it * it }

// 结果: 1 4 9 16
// 解释:
// 将1,2,3,4转换成对应的平方数
mapNotNull

将一个非空值转换成另外一个值

withIndex

将值封装成IndexedValue对象

flow {
    emit(1)
    emit(2)
    emit(3)
    emit(4)
}.withIndex()

// 结果:
// I/System.out: IndexedValue(index=0, value=1)
// I/System.out: IndexedValue(index=1, value=2)
// I/System.out: IndexedValue(index=2, value=3)
// I/System.out: IndexedValue(index=3, value=4)

onEach

每个值释放的时候可以执行的一段代码

flow {
    emit(1)
    emit(2)
    emit(3)
    emit(4)
}.onEach { println("接收到$it") }

// 结果:
I/System.out: 接收到1
I/System.out: 1
I/System.out: 接收到2
I/System.out: 2
I/System.out: 接收到3
I/System.out: 3
I/System.out: 接收到4
I/System.out: 4
scan

有一个初始值,然后每个值都和初始值进行运算,然后这个值作为后一个值的初始值

flow {
    emit(1)
    emit(2)
    emit(3)
    emit(4)
}.scan(100) { acc, value ->
    acc * value
}

// 结果: 100 100 200 600 2400
// 解释:
// 初始值 100
// 1  100 * 1 = 100
// 2  100 * 2 = 200
// 3  200 * 3 = 600
// 4  600 * 4 = 2400
runningReduce

scan类似,但是没有初始值,最开始是它本身

flow {
    emit(1)
    emit(2)
    emit(3)
    emit(4)
}.runningReduce { acc, value ->
    acc * value
}

// 结果: 1 2 6 24
// 解释:
// 1  1
// 2  1 * 2 = 2
// 3  2 * 3 = 6
// 4  6 * 4 = 24

Combine相关的运算函数

任意一个flow释放值且都有释放值后会调用combine后的代码块,且值为每个flow的最新值。

val flow1 = flowOf(1, 2, 3, 4).onEach { delay(10) }
val flow2 = flowOf("a", "b", "c", "d").onEach { delay(20) }

flow1.combine(flow2) { first, second ->
    "$first$second"
}.collect { println("$it") }

// 结果:1a 2a 2b 3b 4b 4c 4d

// 解释:
// 开始 --- flow1 释放 1,flow2 释放 a, 释放1a
// 10毫秒 --- flow1 释放 2,释放2a
// 20毫秒 --- flow2 释放 b,此时释放2b
// 30毫秒 --- flow1 释放 3,此时释放3b
// 40毫秒 --- flow1 释放 4,此时释放4b
// 40毫秒 --- flow2 释放 c,此时释放4c
// 60毫秒 --- flow2 释放 d,此时释放4d

说明:Combine能够接受的参数类型非常多,但是效果都是如上类似。

Flow结束函数

Collect相关的结束函数

collect

接收值,一直有用,无需再做介绍。

launchIn

scope.launch { flow.collect() }的缩写, 代表在某个协程上下文环境中去接收释放的值

val flow1 = flow {
    delay(1000)
    emit(1)
    delay(1000)
    emit(2)
    delay(1000)
    emit(3)
    delay(1000)
    emit(4)
}

flow1.onEach { println("$it") }
    .launchIn(GlobalScope)

// 结果:1 2 3 4

collectIndexed

withIndex对应的,接收封装的IndexedValue

val flow1 = flow {
    emit(1)
    emit(2)
    emit(3)
    emit(4)
}.withIndex()

flow1.collectIndexed { index, value ->
    println("index = $index, value = $value")
}

// 结果:
// I/System.out: index = 0, value = IndexedValue(index=0, value=1)
// I/System.out: index = 1, value = IndexedValue(index=1, value=2)
// I/System.out: index = 2, value = IndexedValue(index=2, value=3)
// I/System.out: index = 3, value = IndexedValue(index=3, value=4)

collectLatest

collectLatestcollect的区别是,如果有新的值释放,上一个值的操作如果没执行完则将会被取消

val flow1 = flow {
    emit(1)
    delay(1000)
    emit(2)
    delay(1000)
    emit(3)
    delay(2000)
    emit(4)
}

flow1.collectLatest {
    println("正在计算收到的值 $it")
    delay(1500)
    println("收到的值 $it")
}

// 结果:
// I/System.out: 正在计算收到的值 1
// I/System.out: 正在计算收到的值 2
// I/System.out: 正在计算收到的值 3
// I/System.out: 收到的值 3
// I/System.out: 正在计算收到的值 4
// I/System.out: 收到的值 4

// 解释:
// 1间隔1000毫秒后释放2,2间隔1000毫秒后释放3,这间隔小于需要接收的时间1500毫秒,所以当2和3 到来后,之前的操作被取消了。
// 3和4 之间的间隔够长能够等待执行完毕,4是最后一个值也能执行

Collection相关的结束函数

toList

将释放的值转换成List

flow {
    emit(1)
    delay(1000)
    emit(2)
    delay(1000)
    emit(3)
    delay(2000)
    emit(4)
}

println(flow1.toList())

// 结果:[1, 2, 3, 4]

toSet

将释放的值转换成Set

flow {
    emit(1)
    delay(1000)
    emit(2)
    delay(1000)
    emit(3)
    delay(2000)
    emit(4)
}

println(flow1.toSet())

// 结果:[1, 2, 3, 4]

Count相关的结束函数

count
  1. 计算释放值的个数
val flow1 = flow {
    emit(1)
    delay(1000)
    emit(2)
    delay(1000)
    emit(3)
    delay(2000)
    emit(4)
}

println(flow1.count())
        
// 结果:4

  1. 计算满足某一条件的释放值的个数
val flow1 = flow {
    emit(1)
    delay(1000)
    emit(2)
    delay(1000)
    emit(3)
    delay(2000)
    emit(4)
}

println(flow1.count { it % 2 == 0 })
        
// 结果:2
// 解释:
// 偶数有2个值 2 4

Reduce相关的结束函数

reduce

runningReduce类似,但是只计算最后的结果。

val flow1 = flow {
    emit(1)
    emit(2)
    emit(3)
    emit(4)
}
println(flow1.reduce { acc, value -> acc * value })

// 结果:24
// 解释:计算最后的结果,1 * 2 * 3 * 4 = 24
fold

scan类似,有一个初始值,但是只计算最后的结果。

val flow1 = flow {
    emit(1)
    emit(2)
    emit(3)
    emit(4)
}
println(flow1.fold(100) { acc, value -> acc * value })

// 结果:2400
// 解释:计算最后的结果,100 * 1 * 2 * 3 * 4 = 2400
single

只接收一个值的Flow

注意:多于1个或者没有值都会报错

 val flow1 = flow {
    emit(1)
}
println(flow1.single())

// 结果:1
singleOrNull

接收一个值的Flow或者一个空值的Flow

first/firstOrNull
  1. 接收释放的第一个值/接收第一个值或者空值
val flow1 = flow {
    emit(1)
    emit(2)
    emit(3)
    emit(4)
}
println(flow1.first())

// 结果:1
  1. 接收第一个满足某个条件的值
val flow1 = flow {
    emit(1)
    emit(2)
    emit(3)
    emit(4)
}
println(flow1.first { it % 2 == 0})

// 结果:2

Flow的错误异常处理

  • 可以通过 try catch 捕获错误异常
try {
    flow {
        emit(1)
        emit(2)
        emit(3)
        emit(4)
    }.collect {
        println("接收值 $it")
        check(it <= 1) { "$it 大于1"  }
    }
} catch (e: Throwable) {
    println("收到了异常: $e")
}

// 结果:
// I/System.out: 接收值 1
// I/System.out: 接收值 2
// I/System.out: 收到了异常: java.lang.IllegalStateException: 2 大于1

// 解释:
// 收到2的时候就抛出了异常,让后flow被取消,异常被捕获
  • 通过catch函数

catch函数能够捕获之前产生的异常,之后的异常无法捕获。

flow {
    emit(1)
    emit(2)
    emit(3)
    emit(4)
    }.map {
        check(it <= 1) { "$it 大于1" }
        it
    }
    .catch { e -> println("Caught $e") }
    .collect()

// 结果:
// Caught java.lang.IllegalStateException: 2 大于1

Flow的取消

Flow的取消可以执行CoroutineScope.cancel

GlobalScope.launch {
    val flow1 = flow {
        emit(1)
        emit(2)
        emit(3)
        emit(4)
    }
    flow1.collect { value ->
        println("$value")
        if (value >= 3) {
            cancel()
        }
    }
}
        
 // 结果:1 2 3       

StateFlow/MutableStateFlow

我们了解了Flow是冷信号,类似于RxJava中的observables。Kotlin还提供了一个类似于RxJava中的SubjectStateFlowMutableStateFlow是相当于值可变的StateFlow

它有如下几个特点:

  1. StateFlow主要用来管理和表示几种不同的状态

例如:网络请求中(Requesting),网络请求完成(Request Complete),网络请求失败(Request Fail)。

  1. StateFlow只有值变化后才会释放新的值,和distinctUntilChanged类似
  2. collect对于它不是必需的,StateFlow创建的时候就能开始释放值
class CounterModel {
    private val _counter = MutableStateFlow(0)  // 私有使用MutableStateFlow
    val counter = _counter.asStateFlow()   // 对外公开只读的StateFlow

    fun inc() {
        _counter.value++ //更改值
    }
}

val aModel = CounterModel()
val bModel = CounterModel()
val sumFlow: Flow<Int> = aModel.counter.combine(bModel.counter) { a, b -> a + b }

// 两个计时器的结果相加

本文主要讲解了Kotlin Flow的基础知识,下一篇我们将来介绍Kotlin Flow在UI界面编写,网络数据请求和数据库中是如何使用的。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,293评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,604评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,958评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,729评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,719评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,630评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,000评论 3 397
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,665评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,909评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,646评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,726评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,400评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,986评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,959评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,197评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,996评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,481评论 2 342

推荐阅读更多精彩内容