【Koltin Flow(三)】Flow操作符之中间操作符(一)

目录

【Koltin Flow(一)】五种创建flow的方式
【Koltin Flow(二)】Flow操作符之末端操作符
【Koltin Flow(三)】Flow操作符之中间操作符(一)
【Koltin Flow(三)】Flow操作符之中间操作符(二)
【Koltin Flow(三)】Flow操作符之中间操作符(三)
【Koltin Flow(四)】Flow背压
【Koltin Flow(五)】SharedFlow及StateFlow

前言

  1. flow的中间操作符比较多,根据作用大概可以分为几个类,如变换操作符、过滤操作符等。
  2. 本篇主要介绍的有变换操作符、过滤操作符、也包含其他的一些操作符。

变换操作符

1.map、mapLatest、mapNotNull,map实现直接的变换操作,collect接收到的为变换后的值,mapLatest则类似于collectLatest变换最新的值,如果来不及变换为上一个则直接取消,变换当前最新的值,mapNotNull则返回变换后不为空的值。

代码如下:
          val flow = flow {
                repeat(10){
                    delay(10)
                    emit(it)
                }
            }

            flow.map {
                "map $it"
            }.collect {
                Log.d(TAG.TAG,it)
            }

            flow.mapLatest {
                delay(15)
                "mapLatest $it"
            }.collect {
                Log.d(TAG.TAG,it)
            }

            flow.mapNotNull {
                if (it % 2 == 0){
                    "mapNotNull $it"
                }else{
                    null
                }
            }.collect {
                Log.d(TAG.TAG,it)
            }
日志如下:
2022-07-29 15:37:01.824 10589-10614/edu.test.demo D/Test-TAG: map 0
2022-07-29 15:37:01.836 10589-10614/edu.test.demo D/Test-TAG: map 1
2022-07-29 15:37:01.853 10589-10614/edu.test.demo D/Test-TAG: map 2
2022-07-29 15:37:01.865 10589-10614/edu.test.demo D/Test-TAG: map 3
2022-07-29 15:37:01.880 10589-10614/edu.test.demo D/Test-TAG: map 4
2022-07-29 15:37:01.892 10589-10614/edu.test.demo D/Test-TAG: map 5
2022-07-29 15:37:01.904 10589-10614/edu.test.demo D/Test-TAG: map 6
2022-07-29 15:37:01.915 10589-10614/edu.test.demo D/Test-TAG: map 7
2022-07-29 15:37:01.926 10589-10614/edu.test.demo D/Test-TAG: map 8
2022-07-29 15:37:01.937 10589-10614/edu.test.demo D/Test-TAG: map 9
2022-07-29 15:37:02.108 10589-10617/edu.test.demo D/Test-TAG: mapLatest 9
2022-07-29 15:37:02.128 10589-10615/edu.test.demo D/Test-TAG: mapNotNull 0
2022-07-29 15:37:02.149 10589-10615/edu.test.demo D/Test-TAG: mapNotNull 2
2022-07-29 15:37:02.171 10589-10615/edu.test.demo D/Test-TAG: mapNotNull 4
2022-07-29 15:37:02.196 10589-10615/edu.test.demo D/Test-TAG: mapNotNull 6
2022-07-29 15:37:02.218 10589-10615/edu.test.demo D/Test-TAG: mapNotNull 8
分析:
  • map转换为字符串之后直接全部接收到。
  • mapLatest 因为转换的时候做了延时,超过了发送的延时,所以前面的全部被取消了,只留下了最新的值9。
  • mapNotNull 因为在转换的时候将不能被2整除的数转换为了null,所以都没接收到,只接收到了偶数。
  1. transform、transformLatest、transformWhile ,transform直接进行转换,和map不同的是transform可以控制流速,transformLatest则进行最新值的转换,类似于mapLatest ,transformWhile则要求闭包返回一个boolean值,为true则继续返回,为false则后续的值全部取消。
代码如下:
          val flow = flow {
                repeat(10){
                    delay(10)
                    emit(it)
                }
            }
           flow.transform {
                delay(1000)
                emit(it*10)
            }.collect {
                Log.d(TAG.TAG,"transform is $it")
            }

            flow.transformLatest {
                delay(1000)
                emit("transformLatest $it")
            }.collect {
                Log.d(TAG.TAG,it)
            }

            flow.transformWhile {
                emit("transformWhile $it")
                it!=5
            }.collect {
                Log.d(TAG.TAG,it)
            }

日志如下:
2022-07-29 15:37:03.243 10589-10615/edu.test.demo D/Test-TAG: transform is 0
2022-07-29 15:37:04.255 10589-10615/edu.test.demo D/Test-TAG: transform is 10
2022-07-29 15:37:05.269 10589-10615/edu.test.demo D/Test-TAG: transform is 20
2022-07-29 15:37:06.281 10589-10615/edu.test.demo D/Test-TAG: transform is 30
2022-07-29 15:37:07.294 10589-10615/edu.test.demo D/Test-TAG: transform is 40
2022-07-29 15:37:08.306 10589-10615/edu.test.demo D/Test-TAG: transform is 50
2022-07-29 15:37:09.318 10589-10615/edu.test.demo D/Test-TAG: transform is 60
2022-07-29 15:37:10.330 10589-10615/edu.test.demo D/Test-TAG: transform is 70
2022-07-29 15:37:11.341 10589-10615/edu.test.demo D/Test-TAG: transform is 80
2022-07-29 15:37:12.353 10589-10615/edu.test.demo D/Test-TAG: transform is 90
2022-07-29 15:37:13.470 10589-10617/edu.test.demo D/Test-TAG: transformLatest 9
2022-07-29 15:37:13.483 10589-10617/edu.test.demo D/Test-TAG: transformWhile 0
2022-07-29 15:37:13.495 10589-10617/edu.test.demo D/Test-TAG: transformWhile 1
2022-07-29 15:37:13.509 10589-10617/edu.test.demo D/Test-TAG: transformWhile 2
2022-07-29 15:37:13.521 10589-10617/edu.test.demo D/Test-TAG: transformWhile 3
2022-07-29 15:37:13.532 10589-10617/edu.test.demo D/Test-TAG: transformWhile 4
2022-07-29 15:37:13.544 10589-10617/edu.test.demo D/Test-TAG: transformWhile 5
分析:
  • 可以看出transform转换的时候控制了流速,变成了每秒发送一个值,接收到的时候每秒打印出一个值。
  • transformLatest只转换了最新的值9.
  • transformWhile 因为在5的时候返回了false,所以后面的值全部被取消了。

过滤操作符

  1. filter、filterNot、filterIsInstance、filterNotNull、fliter闭包返回一个Boolean值,为true则返回,false则不返回,filterNot刚好相反;filterIsInstance则进行类型过滤,如过滤出String或者Int等,filterNotNull则过滤null值,返回非空值。
代码如下:
            val flow = flow {
                repeat(10){
                    delay(10)
                    emit(it)
                }
            }
            flow.filter {
                it % 2 == 0
            }.collect {
                Log.d(TAG.TAG,"filter $it")
            }

            flow.filterNot {
                it % 2 == 0
            }.collect {
                Log.d(TAG.TAG,"filterNot $it")
            }

            flow {
                emit(1)
                emit("123")
            }.filterIsInstance<String>().collect {
                Log.d(TAG.TAG,"filterIsInstance $it")
            }

            flow {
                emit(1)
                emit(null)
                emit(2)
            }.filterNotNull().collect {
                Log.d(TAG.TAG,"filterNotNull $it")
            }

日志如下:
2022-07-29 15:50:45.376 10675-10703/edu.test.demo D/Test-TAG: filter 0
2022-07-29 15:50:45.400 10675-10703/edu.test.demo D/Test-TAG: filter 2
2022-07-29 15:50:45.422 10675-10703/edu.test.demo D/Test-TAG: filter 4
2022-07-29 15:50:45.444 10675-10703/edu.test.demo D/Test-TAG: filter 6
2022-07-29 15:50:45.466 10675-10703/edu.test.demo D/Test-TAG: filter 8
2022-07-29 15:50:45.505 10675-10703/edu.test.demo D/Test-TAG: filterNot 1
2022-07-29 15:50:45.528 10675-10703/edu.test.demo D/Test-TAG: filterNot 3
2022-07-29 15:50:45.550 10675-10703/edu.test.demo D/Test-TAG: filterNot 5
2022-07-29 15:50:45.574 10675-10703/edu.test.demo D/Test-TAG: filterNot 7
2022-07-29 15:50:45.597 10675-10703/edu.test.demo D/Test-TAG: filterNot 9
2022-07-29 15:50:45.598 10675-10703/edu.test.demo D/Test-TAG: filterIsInstance 123
2022-07-29 15:50:45.600 10675-10703/edu.test.demo D/Test-TAG: filterNotNull 1
2022-07-29 15:50:45.600 10675-10703/edu.test.demo D/Test-TAG: filterNotNull 2
分析:
  • 可以看出 filter 满足条件过滤出了偶数。
  • filterNot同样的条件刚好相反,过滤出了奇数。
  • filterIsInstance过滤出了字符串。
  • filterNotNull过滤了空值。
  1. take、takeWhile、drop、dropWhile,take则是取几个值返回,takeWhile按条件取值,如果满足条件就返回,不满足则后面全部取消。drop和take相反,dropWhile和takeWhile相反。
代码如下:
            val flow = flow {
                repeat(10){
                    delay(10)
                    emit(it)
                }
            }
              flow.take(5).collect {
                Log.d(TAG.TAG,"take $it")
            }

            flow.takeWhile {
                it < 5
            }.collect {
                Log.d(TAG.TAG,"takeWhile $it")
            }

            flow.drop(5).collect {
                Log.d(TAG.TAG,"drop $it")
            }
            flow.dropWhile {
                it < 5
            }.collect {
                Log.d(TAG.TAG,"dropWhile $it")
            }
日志如下:
2022-07-29 16:04:08.109 11070-11096/edu.test.demo D/Test-TAG: take 0
2022-07-29 16:04:08.120 11070-11096/edu.test.demo D/Test-TAG: take 1
2022-07-29 16:04:08.132 11070-11096/edu.test.demo D/Test-TAG: take 2
2022-07-29 16:04:08.144 11070-11096/edu.test.demo D/Test-TAG: take 3
2022-07-29 16:04:08.169 11070-11096/edu.test.demo D/Test-TAG: take 4
2022-07-29 16:04:08.184 11070-11096/edu.test.demo D/Test-TAG: takeWhile 0
2022-07-29 16:04:08.197 11070-11096/edu.test.demo D/Test-TAG: takeWhile 1
2022-07-29 16:04:08.207 11070-11096/edu.test.demo D/Test-TAG: takeWhile 2
2022-07-29 16:04:08.218 11070-11096/edu.test.demo D/Test-TAG: takeWhile 3
2022-07-29 16:04:08.229 11070-11096/edu.test.demo D/Test-TAG: takeWhile 4
2022-07-29 16:04:08.320 11070-11096/edu.test.demo D/Test-TAG: drop 5
2022-07-29 16:04:08.332 11070-11096/edu.test.demo D/Test-TAG: drop 6
2022-07-29 16:04:08.343 11070-11096/edu.test.demo D/Test-TAG: drop 7
2022-07-29 16:04:08.355 11070-11096/edu.test.demo D/Test-TAG: drop 8
2022-07-29 16:04:08.366 11070-11096/edu.test.demo D/Test-TAG: drop 9
2022-07-29 16:04:08.435 11070-11096/edu.test.demo D/Test-TAG: dropWhile 5
2022-07-29 16:04:08.446 11070-11096/edu.test.demo D/Test-TAG: dropWhile 6
2022-07-29 16:04:08.457 11070-11096/edu.test.demo D/Test-TAG: dropWhile 7
2022-07-29 16:04:08.467 11070-11096/edu.test.demo D/Test-TAG: dropWhile 8
2022-07-29 16:04:08.478 11070-11096/edu.test.demo D/Test-TAG: dropWhile 9
分析:
  • 可以看出take5 就取了前面五个进行返回,drop刚好相反。
  • takeWhile则返回了满足条件的前五个,后面的全部取消,dropWhile刚好相反。
  • 也会有人有疑问,后面的都大于等于5了,所以都取消了,那后面如果出现个1呢,还会不会返回,那么再看如下代码,可以看出,后面即使出现满足条件的也被全部取消了:
flow{
                emit(1)
                emit(2)
                emit(5)
                emit(1)
                emit(2)
            }.takeWhile {
                it<5
            }.collect {
                Log.d(TAG.TAG,"takeWhile $it")
            }
2022-07-29 16:04:08.087 11070-11096/edu.test.demo D/Test-TAG: takeWhile 1
2022-07-29 16:04:08.087 11070-11096/edu.test.demo D/Test-TAG: takeWhile 2
  • 当然这个结论从源码也可以看出来,在注释的地方直接抛出异常,后续不会再执行,如下:
// Internal building block for non-tailcalling flow-truncating operators
internal suspend inline fun <T> Flow<T>.collectWhile(crossinline predicate: suspend (value: T) -> Boolean) {
    val collector = object : FlowCollector<T> {
        override suspend fun emit(value: T) {
            // Note: we are checking predicate first, then throw. If the predicate does suspend (calls emit, for example)
            // the the resulting code is never tail-suspending and produces a state-machine
            if (!predicate(value)) {
                //此处predicate如果返回false直接抛出异常
                throw AbortFlowException(this)
            }
        }
    }
    try {
        collect(collector)
    } catch (e: AbortFlowException) {
        e.checkOwnership(collector)
    }
}

  1. debounce、sample,debounce是超时取值,一个值发送之后,计时多少秒再取下一个值,二sample则属于周期性取值,每隔一定的时间周期取最新的值。
代码如下:
            flow {
                emit(1)
                delay(10)
                emit(2)
                delay(1000)
                emit(100)
                delay(500)
                emit(200)
            }.debounce(1000).collect {
                Log.d(TAG.TAG,"debounce $it")
            }

            flow {
                emit(1)
                delay(400)
                emit(2)
                delay(400)
                emit(100)
                emit(200)
                delay(200)
            }.sample(500).collect {
                Log.d(TAG.TAG,"sample $it")
            }
日志如下:
2022-08-01 10:10:12.716 3651-3679/edu.test.demo D/Test-TAG: debounce 2
2022-08-01 10:10:13.221 3651-3679/edu.test.demo D/Test-TAG: debounce 200
2022-08-01 10:10:13.724 3651-3679/edu.test.demo D/Test-TAG: sample 2
2022-08-01 10:10:14.226 3651-3679/edu.test.demo D/Test-TAG: sample 200
分析:
  • 我们看出debounce 取值为2,200,原因为第一次发送为1 之后1000ms内又发送了,所以1被覆盖,2之后1000ms内未发送值,所以2被打印,后面同理,100被200覆盖,最后打印出了200.
  • sample 取值也为2,200,原因为500ms,第一个周期内(0-500ms),1被2覆盖,打印了2,第二个周期内(500-1000ms),100被200覆盖,打印了2
  1. distinctUntilChanged、distinctUntilChanged()、distinctUntilChangedBy,用于去除满足一定条件的值,如distinctUntilChanged()去重,如果两个连续的值一样,则跳过当前值,继续发送后面的值,属于distinctUntilChanged的简化用法,distinctUntilChanged还可以实现其他的功能,如只打印比当前值大的值,条件返回为boolean,为false打印,为true直接过滤,distinctUntilChangedBy也是去重,但是条件字段可以自定义,如根据bean的某一个值去重。
代码如下:
            flow {
                emit(1)
                emit(2)
                emit(3)
                emit(1)
                emit(1)
                emit(1)
            }.distinctUntilChanged().collect {
                Log.d(TAG.TAG,"distinctUntilChanged  $it")
            }

            flow {
                emit(1)
                emit(2)
                emit(3)
                emit(1)
                emit(1)
                emit(1)
            }.distinctUntilChanged { old, new ->
                old > new
            }.collect {
                Log.d(TAG.TAG,"distinctUntilChanged(old, new)  $it")
            }
            
            flow {
                emit(1)
                emit(2)
                emit(3)
                emit(1)
                emit(1)
                emit(1)
            }.distinctUntilChangedBy {
                it
            }.collect {
                Log.d(TAG.TAG,"distinctUntilChangedBy  $it")
            }
日志如下:
2022-08-01 10:33:56.073 4332-4359/edu.test.demo D/Test-TAG: distinctUntilChanged  1
2022-08-01 10:33:56.073 4332-4359/edu.test.demo D/Test-TAG: distinctUntilChanged  2
2022-08-01 10:33:56.074 4332-4359/edu.test.demo D/Test-TAG: distinctUntilChanged  3
2022-08-01 10:33:56.074 4332-4359/edu.test.demo D/Test-TAG: distinctUntilChanged  1
2022-08-01 10:33:56.074 4332-4359/edu.test.demo D/Test-TAG: distinctUntilChanged(old, new)  1
2022-08-01 10:33:56.075 4332-4359/edu.test.demo D/Test-TAG: distinctUntilChanged(old, new)  2
2022-08-01 10:33:56.075 4332-4359/edu.test.demo D/Test-TAG: distinctUntilChanged(old, new)  3
2022-08-01 10:33:56.075 4332-4359/edu.test.demo D/Test-TAG: distinctUntilChangedBy  1
2022-08-01 10:33:56.076 4332-4359/edu.test.demo D/Test-TAG: distinctUntilChangedBy  2
2022-08-01 10:33:56.076 4332-4359/edu.test.demo D/Test-TAG: distinctUntilChangedBy  3
2022-08-01 10:33:56.076 4332-4359/edu.test.demo D/Test-TAG: distinctUntilChangedBy  1
分析:
  • 可以看出distinctUntilChanged()直接去除了后面两个重复的1, distinctUntilChanged则值打印出来了后面比前面大的值,后面3个1没打印,如果要实现去重,只需要将old>new改成old==new。distinctUntilChangedBy也实现了去重的效果。

总结

  • 本篇主要介绍了中间操作符的一部分,其他的在下一篇继续介绍。
  • 本篇设计到中间操作符的两类操作符,分别为变换操作符、过滤操作符。
  • 操作符本身的理解并不难,只要总结起来结合具体的代码去理解就会比较容易。
  • 本篇为自己学习及使用过程中的总结,难免存在错误或思维局限,欢迎大家讨论指正。
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,047评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,807评论 3 386
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,501评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,839评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,951评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,117评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,188评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,929评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,372评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,679评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,837评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,536评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,168评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,886评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,129评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,665评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,739评论 2 351

推荐阅读更多精彩内容