目录
【Koltin Flow(一)】五种创建flow的方式
【Koltin Flow(二)】Flow操作符之末端操作符
【Koltin Flow(三)】Flow操作符之中间操作符(一)
【Koltin Flow(三)】Flow操作符之中间操作符(二)
【Koltin Flow(三)】Flow操作符之中间操作符(三)
【Koltin Flow(四)】Flow背压
【Koltin Flow(五)】SharedFlow及StateFlow
前言
- flow的中间操作符比较多,根据作用大概可以分为几个类,如变换操作符、过滤操作符等。
- 本篇主要介绍的有变换操作符、过滤操作符、也包含其他的一些操作符。
变换操作符
1.map、mapLatest、mapNotNull,map实现直接的变换操作,collect接收到的为变换后的值,mapLatest则类似于collectLatest变换最新的值,如果来不及变换为上一个则直接取消,变换当前最新的值,mapNotNull则返回变换后不为空的值。
代码如下:
val flow = flow {
repeat(10){
delay(10)
emit(it)
}
}
flow.map {
"map $it"
}.collect {
Log.d(TAG.TAG,it)
}
flow.mapLatest {
delay(15)
"mapLatest $it"
}.collect {
Log.d(TAG.TAG,it)
}
flow.mapNotNull {
if (it % 2 == 0){
"mapNotNull $it"
}else{
null
}
}.collect {
Log.d(TAG.TAG,it)
}
日志如下:
2022-07-29 15:37:01.824 10589-10614/edu.test.demo D/Test-TAG: map 0
2022-07-29 15:37:01.836 10589-10614/edu.test.demo D/Test-TAG: map 1
2022-07-29 15:37:01.853 10589-10614/edu.test.demo D/Test-TAG: map 2
2022-07-29 15:37:01.865 10589-10614/edu.test.demo D/Test-TAG: map 3
2022-07-29 15:37:01.880 10589-10614/edu.test.demo D/Test-TAG: map 4
2022-07-29 15:37:01.892 10589-10614/edu.test.demo D/Test-TAG: map 5
2022-07-29 15:37:01.904 10589-10614/edu.test.demo D/Test-TAG: map 6
2022-07-29 15:37:01.915 10589-10614/edu.test.demo D/Test-TAG: map 7
2022-07-29 15:37:01.926 10589-10614/edu.test.demo D/Test-TAG: map 8
2022-07-29 15:37:01.937 10589-10614/edu.test.demo D/Test-TAG: map 9
2022-07-29 15:37:02.108 10589-10617/edu.test.demo D/Test-TAG: mapLatest 9
2022-07-29 15:37:02.128 10589-10615/edu.test.demo D/Test-TAG: mapNotNull 0
2022-07-29 15:37:02.149 10589-10615/edu.test.demo D/Test-TAG: mapNotNull 2
2022-07-29 15:37:02.171 10589-10615/edu.test.demo D/Test-TAG: mapNotNull 4
2022-07-29 15:37:02.196 10589-10615/edu.test.demo D/Test-TAG: mapNotNull 6
2022-07-29 15:37:02.218 10589-10615/edu.test.demo D/Test-TAG: mapNotNull 8
分析:
- map转换为字符串之后直接全部接收到。
- mapLatest 因为转换的时候做了延时,超过了发送的延时,所以前面的全部被取消了,只留下了最新的值9。
- mapNotNull 因为在转换的时候将不能被2整除的数转换为了null,所以都没接收到,只接收到了偶数。
- transform、transformLatest、transformWhile ,transform直接进行转换,和map不同的是transform可以控制流速,transformLatest则进行最新值的转换,类似于mapLatest ,transformWhile则要求闭包返回一个boolean值,为true则继续返回,为false则后续的值全部取消。
代码如下:
val flow = flow {
repeat(10){
delay(10)
emit(it)
}
}
flow.transform {
delay(1000)
emit(it*10)
}.collect {
Log.d(TAG.TAG,"transform is $it")
}
flow.transformLatest {
delay(1000)
emit("transformLatest $it")
}.collect {
Log.d(TAG.TAG,it)
}
flow.transformWhile {
emit("transformWhile $it")
it!=5
}.collect {
Log.d(TAG.TAG,it)
}
日志如下:
2022-07-29 15:37:03.243 10589-10615/edu.test.demo D/Test-TAG: transform is 0
2022-07-29 15:37:04.255 10589-10615/edu.test.demo D/Test-TAG: transform is 10
2022-07-29 15:37:05.269 10589-10615/edu.test.demo D/Test-TAG: transform is 20
2022-07-29 15:37:06.281 10589-10615/edu.test.demo D/Test-TAG: transform is 30
2022-07-29 15:37:07.294 10589-10615/edu.test.demo D/Test-TAG: transform is 40
2022-07-29 15:37:08.306 10589-10615/edu.test.demo D/Test-TAG: transform is 50
2022-07-29 15:37:09.318 10589-10615/edu.test.demo D/Test-TAG: transform is 60
2022-07-29 15:37:10.330 10589-10615/edu.test.demo D/Test-TAG: transform is 70
2022-07-29 15:37:11.341 10589-10615/edu.test.demo D/Test-TAG: transform is 80
2022-07-29 15:37:12.353 10589-10615/edu.test.demo D/Test-TAG: transform is 90
2022-07-29 15:37:13.470 10589-10617/edu.test.demo D/Test-TAG: transformLatest 9
2022-07-29 15:37:13.483 10589-10617/edu.test.demo D/Test-TAG: transformWhile 0
2022-07-29 15:37:13.495 10589-10617/edu.test.demo D/Test-TAG: transformWhile 1
2022-07-29 15:37:13.509 10589-10617/edu.test.demo D/Test-TAG: transformWhile 2
2022-07-29 15:37:13.521 10589-10617/edu.test.demo D/Test-TAG: transformWhile 3
2022-07-29 15:37:13.532 10589-10617/edu.test.demo D/Test-TAG: transformWhile 4
2022-07-29 15:37:13.544 10589-10617/edu.test.demo D/Test-TAG: transformWhile 5
分析:
- 可以看出transform转换的时候控制了流速,变成了每秒发送一个值,接收到的时候每秒打印出一个值。
- transformLatest只转换了最新的值9.
- transformWhile 因为在5的时候返回了false,所以后面的值全部被取消了。
过滤操作符
- filter、filterNot、filterIsInstance、filterNotNull、fliter闭包返回一个Boolean值,为true则返回,false则不返回,filterNot刚好相反;filterIsInstance则进行类型过滤,如过滤出String或者Int等,filterNotNull则过滤null值,返回非空值。
代码如下:
val flow = flow {
repeat(10){
delay(10)
emit(it)
}
}
flow.filter {
it % 2 == 0
}.collect {
Log.d(TAG.TAG,"filter $it")
}
flow.filterNot {
it % 2 == 0
}.collect {
Log.d(TAG.TAG,"filterNot $it")
}
flow {
emit(1)
emit("123")
}.filterIsInstance<String>().collect {
Log.d(TAG.TAG,"filterIsInstance $it")
}
flow {
emit(1)
emit(null)
emit(2)
}.filterNotNull().collect {
Log.d(TAG.TAG,"filterNotNull $it")
}
日志如下:
2022-07-29 15:50:45.376 10675-10703/edu.test.demo D/Test-TAG: filter 0
2022-07-29 15:50:45.400 10675-10703/edu.test.demo D/Test-TAG: filter 2
2022-07-29 15:50:45.422 10675-10703/edu.test.demo D/Test-TAG: filter 4
2022-07-29 15:50:45.444 10675-10703/edu.test.demo D/Test-TAG: filter 6
2022-07-29 15:50:45.466 10675-10703/edu.test.demo D/Test-TAG: filter 8
2022-07-29 15:50:45.505 10675-10703/edu.test.demo D/Test-TAG: filterNot 1
2022-07-29 15:50:45.528 10675-10703/edu.test.demo D/Test-TAG: filterNot 3
2022-07-29 15:50:45.550 10675-10703/edu.test.demo D/Test-TAG: filterNot 5
2022-07-29 15:50:45.574 10675-10703/edu.test.demo D/Test-TAG: filterNot 7
2022-07-29 15:50:45.597 10675-10703/edu.test.demo D/Test-TAG: filterNot 9
2022-07-29 15:50:45.598 10675-10703/edu.test.demo D/Test-TAG: filterIsInstance 123
2022-07-29 15:50:45.600 10675-10703/edu.test.demo D/Test-TAG: filterNotNull 1
2022-07-29 15:50:45.600 10675-10703/edu.test.demo D/Test-TAG: filterNotNull 2
分析:
- 可以看出 filter 满足条件过滤出了偶数。
- filterNot同样的条件刚好相反,过滤出了奇数。
- filterIsInstance过滤出了字符串。
- filterNotNull过滤了空值。
- take、takeWhile、drop、dropWhile,take则是取几个值返回,takeWhile按条件取值,如果满足条件就返回,不满足则后面全部取消。drop和take相反,dropWhile和takeWhile相反。
代码如下:
val flow = flow {
repeat(10){
delay(10)
emit(it)
}
}
flow.take(5).collect {
Log.d(TAG.TAG,"take $it")
}
flow.takeWhile {
it < 5
}.collect {
Log.d(TAG.TAG,"takeWhile $it")
}
flow.drop(5).collect {
Log.d(TAG.TAG,"drop $it")
}
flow.dropWhile {
it < 5
}.collect {
Log.d(TAG.TAG,"dropWhile $it")
}
日志如下:
2022-07-29 16:04:08.109 11070-11096/edu.test.demo D/Test-TAG: take 0
2022-07-29 16:04:08.120 11070-11096/edu.test.demo D/Test-TAG: take 1
2022-07-29 16:04:08.132 11070-11096/edu.test.demo D/Test-TAG: take 2
2022-07-29 16:04:08.144 11070-11096/edu.test.demo D/Test-TAG: take 3
2022-07-29 16:04:08.169 11070-11096/edu.test.demo D/Test-TAG: take 4
2022-07-29 16:04:08.184 11070-11096/edu.test.demo D/Test-TAG: takeWhile 0
2022-07-29 16:04:08.197 11070-11096/edu.test.demo D/Test-TAG: takeWhile 1
2022-07-29 16:04:08.207 11070-11096/edu.test.demo D/Test-TAG: takeWhile 2
2022-07-29 16:04:08.218 11070-11096/edu.test.demo D/Test-TAG: takeWhile 3
2022-07-29 16:04:08.229 11070-11096/edu.test.demo D/Test-TAG: takeWhile 4
2022-07-29 16:04:08.320 11070-11096/edu.test.demo D/Test-TAG: drop 5
2022-07-29 16:04:08.332 11070-11096/edu.test.demo D/Test-TAG: drop 6
2022-07-29 16:04:08.343 11070-11096/edu.test.demo D/Test-TAG: drop 7
2022-07-29 16:04:08.355 11070-11096/edu.test.demo D/Test-TAG: drop 8
2022-07-29 16:04:08.366 11070-11096/edu.test.demo D/Test-TAG: drop 9
2022-07-29 16:04:08.435 11070-11096/edu.test.demo D/Test-TAG: dropWhile 5
2022-07-29 16:04:08.446 11070-11096/edu.test.demo D/Test-TAG: dropWhile 6
2022-07-29 16:04:08.457 11070-11096/edu.test.demo D/Test-TAG: dropWhile 7
2022-07-29 16:04:08.467 11070-11096/edu.test.demo D/Test-TAG: dropWhile 8
2022-07-29 16:04:08.478 11070-11096/edu.test.demo D/Test-TAG: dropWhile 9
分析:
- 可以看出take5 就取了前面五个进行返回,drop刚好相反。
- takeWhile则返回了满足条件的前五个,后面的全部取消,dropWhile刚好相反。
- 也会有人有疑问,后面的都大于等于5了,所以都取消了,那后面如果出现个1呢,还会不会返回,那么再看如下代码,可以看出,后面即使出现满足条件的也被全部取消了:
flow{
emit(1)
emit(2)
emit(5)
emit(1)
emit(2)
}.takeWhile {
it<5
}.collect {
Log.d(TAG.TAG,"takeWhile $it")
}
2022-07-29 16:04:08.087 11070-11096/edu.test.demo D/Test-TAG: takeWhile 1
2022-07-29 16:04:08.087 11070-11096/edu.test.demo D/Test-TAG: takeWhile 2
- 当然这个结论从源码也可以看出来,在注释的地方直接抛出异常,后续不会再执行,如下:
// Internal building block for non-tailcalling flow-truncating operators
internal suspend inline fun <T> Flow<T>.collectWhile(crossinline predicate: suspend (value: T) -> Boolean) {
val collector = object : FlowCollector<T> {
override suspend fun emit(value: T) {
// Note: we are checking predicate first, then throw. If the predicate does suspend (calls emit, for example)
// the the resulting code is never tail-suspending and produces a state-machine
if (!predicate(value)) {
//此处predicate如果返回false直接抛出异常
throw AbortFlowException(this)
}
}
}
try {
collect(collector)
} catch (e: AbortFlowException) {
e.checkOwnership(collector)
}
}
- debounce、sample,debounce是超时取值,一个值发送之后,计时多少秒再取下一个值,二sample则属于周期性取值,每隔一定的时间周期取最新的值。
代码如下:
flow {
emit(1)
delay(10)
emit(2)
delay(1000)
emit(100)
delay(500)
emit(200)
}.debounce(1000).collect {
Log.d(TAG.TAG,"debounce $it")
}
flow {
emit(1)
delay(400)
emit(2)
delay(400)
emit(100)
emit(200)
delay(200)
}.sample(500).collect {
Log.d(TAG.TAG,"sample $it")
}
日志如下:
2022-08-01 10:10:12.716 3651-3679/edu.test.demo D/Test-TAG: debounce 2
2022-08-01 10:10:13.221 3651-3679/edu.test.demo D/Test-TAG: debounce 200
2022-08-01 10:10:13.724 3651-3679/edu.test.demo D/Test-TAG: sample 2
2022-08-01 10:10:14.226 3651-3679/edu.test.demo D/Test-TAG: sample 200
分析:
- 我们看出debounce 取值为2,200,原因为第一次发送为1 之后1000ms内又发送了,所以1被覆盖,2之后1000ms内未发送值,所以2被打印,后面同理,100被200覆盖,最后打印出了200.
- sample 取值也为2,200,原因为500ms,第一个周期内(0-500ms),1被2覆盖,打印了2,第二个周期内(500-1000ms),100被200覆盖,打印了2
- distinctUntilChanged、distinctUntilChanged()、distinctUntilChangedBy,用于去除满足一定条件的值,如distinctUntilChanged()去重,如果两个连续的值一样,则跳过当前值,继续发送后面的值,属于distinctUntilChanged的简化用法,distinctUntilChanged还可以实现其他的功能,如只打印比当前值大的值,条件返回为boolean,为false打印,为true直接过滤,distinctUntilChangedBy也是去重,但是条件字段可以自定义,如根据bean的某一个值去重。
代码如下:
flow {
emit(1)
emit(2)
emit(3)
emit(1)
emit(1)
emit(1)
}.distinctUntilChanged().collect {
Log.d(TAG.TAG,"distinctUntilChanged $it")
}
flow {
emit(1)
emit(2)
emit(3)
emit(1)
emit(1)
emit(1)
}.distinctUntilChanged { old, new ->
old > new
}.collect {
Log.d(TAG.TAG,"distinctUntilChanged(old, new) $it")
}
flow {
emit(1)
emit(2)
emit(3)
emit(1)
emit(1)
emit(1)
}.distinctUntilChangedBy {
it
}.collect {
Log.d(TAG.TAG,"distinctUntilChangedBy $it")
}
日志如下:
2022-08-01 10:33:56.073 4332-4359/edu.test.demo D/Test-TAG: distinctUntilChanged 1
2022-08-01 10:33:56.073 4332-4359/edu.test.demo D/Test-TAG: distinctUntilChanged 2
2022-08-01 10:33:56.074 4332-4359/edu.test.demo D/Test-TAG: distinctUntilChanged 3
2022-08-01 10:33:56.074 4332-4359/edu.test.demo D/Test-TAG: distinctUntilChanged 1
2022-08-01 10:33:56.074 4332-4359/edu.test.demo D/Test-TAG: distinctUntilChanged(old, new) 1
2022-08-01 10:33:56.075 4332-4359/edu.test.demo D/Test-TAG: distinctUntilChanged(old, new) 2
2022-08-01 10:33:56.075 4332-4359/edu.test.demo D/Test-TAG: distinctUntilChanged(old, new) 3
2022-08-01 10:33:56.075 4332-4359/edu.test.demo D/Test-TAG: distinctUntilChangedBy 1
2022-08-01 10:33:56.076 4332-4359/edu.test.demo D/Test-TAG: distinctUntilChangedBy 2
2022-08-01 10:33:56.076 4332-4359/edu.test.demo D/Test-TAG: distinctUntilChangedBy 3
2022-08-01 10:33:56.076 4332-4359/edu.test.demo D/Test-TAG: distinctUntilChangedBy 1
分析:
- 可以看出distinctUntilChanged()直接去除了后面两个重复的1, distinctUntilChanged则值打印出来了后面比前面大的值,后面3个1没打印,如果要实现去重,只需要将old>new改成old==new。distinctUntilChangedBy也实现了去重的效果。
总结
- 本篇主要介绍了中间操作符的一部分,其他的在下一篇继续介绍。
- 本篇设计到中间操作符的两类操作符,分别为变换操作符、过滤操作符。
- 操作符本身的理解并不难,只要总结起来结合具体的代码去理解就会比较容易。
- 本篇为自己学习及使用过程中的总结,难免存在错误或思维局限,欢迎大家讨论指正。