kotlin flow (二)

Flow操作符

  • buffer(int)

该操作符会新起一个协程来收集buffer之前的代码运行结果,新协程通过channel通知flow所在的协程,
并且与当前flow所在协成并行运行,如果缓冲区满了,会暂停产生新的数据等到收集器把缓冲区的数据消费完。
参数指定缓冲区的大小

flowOf("A", "B", "C")
    .onEach  { println("1$it") }
    .collect { println("2$it") }
输出
Q : -->-- [1A] -- [2A] -- [1B] -- [2B] -- [1C] -- [2C] -->--

flowOf("A", "B", "C")
    .onEach  { println("1$it") }
    .buffer()  // <--------------- buffer between onEach and collect
    .collect { println("2$it") }
输出
P : -->-- [1A] -- [1B] -- [1C] ---------->--  // flowOf(...).onEach { ... }

                      |
                      | channel               // buffer()
                      V

Q : -->---------- [2A] -- [2B] -- [2C] -->--  // collect

协序之间使用一个通道将协同程序P发出的元素发送到协同程序Q。如果缓冲操作符之前的代码(协同程序P中)比缓冲操作符之后的代码(协调程序Q中)快,则该通道将在某个时间点变满,并将暂停生产者协同程序P,直到消费者协同程序Q赶上。capacity参数定义此缓冲区的大小。

  • catch

捕获catch上游不包含取消异常的所有异常

fun errorTest(){
        viewModelScope.launch {
            flowOf(1,2)
                .map { repository.error() }
                .catch {
                    Log.d(TAG, "errorTest:catch $it")
                }
                .flowOn(Dispatchers.IO)
                .collect {
                    Log.d(TAG, "errorTest:collect $it")
                }
        }
    }

//error方法直接抛出异常

fun error():String{
        throw NullPointerException()
        //return "Test catch"
    }
//打印结果
FlowViewModel: errorTest:catch java.lang.NullPointerException
  • collect

终端操作符,触发flow运行并收集flow中发送的数据,是挂起函数只能在协程中调用

  • collectIndexed

带下标的收集

  • collectLatest

与 collect的区别是 ,有新值发出时,如果此时上个收集尚未完成,则会取消掉上个值的收集操作

  • combine

生成一个新的flow,改flow的值通过{}中的转换规则,组合每个流最近发出的值生成

val flow = flowOf(1, 2).onEach { delay(10) }
val flow2 = flowOf("a", "b", "c").onEach { delay(15) }
flow.combine(flow2) { i, s -> i.toString() + s }.collect {
    println(it) // Will print "1a 2a 2b 2c"
}
  • combineTransform

combine+Transform

  • conflate

buffer(CONFLATED)实现的,主要区别在于buffer会等待收集器而conflate抛弃中间的数据收集最新的数据,因为发送数据不会停止而收集数据又比较慢,所以当收集器收集数据的时候可能数据已经发送了很多条,收集器只收集最新的一条

fun conflateTest() {
        viewModelScope.launch {
            val flow = flow {
                for (i in 1..30) {
                    delay(100)
                    emit(i)
                }
            }
            flow.conflate()
                .onEach { delay(200) }
                .collect {
                    Log.d(TAG, "conflateTest:conflate $it")
                }
        }
    }
打印结果
conflateTest:conflate 1
conflateTest:conflate 2
conflateTest:conflate 4
conflateTest:conflate 6
conflateTest:conflate 8
conflateTest:conflate 10
conflateTest:conflate 12
conflateTest:conflate 14
conflateTest:conflate 16
conflateTest:conflate 18
conflateTest:conflate 20
conflateTest:conflate 22
conflateTest:conflate 24
conflateTest:conflate 26
conflateTest:conflate 28
conflateTest:conflate 30

发送的数据比收集快一倍刚好中间间隔一个数字

  • consumeAsFlow

将channel转换成flow,只能有一个数据收集者,如果添加多个数据收集器会抛出llegalStateException异常

mDemoChannel.consumeAsFlow()
  • receiveAsFlow

将channel转换成flow,允许有多个收集器,但不是多播,可能会轮流收到值

  • count()返回此流中的元素数

fun countFlow(){
        viewModelScope.launch {
            val count = flowOf(1,2,3,4).count()
            Log.d(TAG, "countFlow:count $count")
        }
    }
FlowViewModel: countFlow:count 4
  • debounce

返回一个新flow,改flow中的元素是通过给定时间内最新的一个值,其他的过滤掉

fun debounceTest(){
        viewModelScope.launch {
            flow {
                emit(1)
                delay(90)
                emit(2)
                delay(90)
                emit(3)
                delay(1010)
                emit(4)
                delay(1010)
                emit(5)
            }.debounce(1000).collect {
                Log.d(TAG, "countFlow:count $it")
            }
        }
    }
输出:3,4,5
  • distinctUntilChanged

返回一个flow,过滤连续重复的值

fun distinctUntilChangedTest() {
        viewModelScope.launch {
            flowOf("hello","kotlin","kotlin","hello")
                .distinctUntilChanged()
                .collect {
                    Log.d(TAG, "distinctUntilChangedTest:distinctUntilChanged $it")
                }
        }
    }
输出:hello,kotlin,hello
  • distinctUntilChangedBy

返回一个flow,过滤指定条件的重复值

fun distinctUntilChangedByTest(){
        viewModelScope.launch {
            flowOf(
                Product("cat",30.98),
                Product("cat",99.23),
                Product("cat",0.22)
            ).distinctUntilChangedBy { it.name }.collect {
                Log.d(TAG, "distinctUntilChangedByTest:distinctUntilChangedBy $it")
            }
        }
    }
输出:Product(name=cat, price=30.98)
  • drop

返回一个flow,丢弃前count个数的元素

fun dropTest(){
        viewModelScope.launch {
            flowOf(1,2,3,4).drop(2).collect {
                Log.d(TAG, "dropTest:drop $it")
            }
        }
    }
输出:3,4
  • dropWhile

测试结果:给定条件和第一个元素匹配,匹配成功将后面所有的都返回,匹配失败返回所有。

条件==1
 fun dropWhileTest(){
        viewModelScope.launch {
            flowOf(1,2,3,4,5,6).dropWhile { it == 1 }.collect {
                Log.d(TAG, "dropWhileTest:dropWhile $it")
            }
        }
    }
输出:2,3,4,5,6
条件==3
fun dropWhileTest(){
        viewModelScope.launch {
            flowOf(1,2,3,4,5,6).dropWhile { it == 3 }.collect {
                Log.d(TAG, "dropWhileTest:dropWhile $it")
            }
        }
    }
输出:1,2,3,4,5,6
  • emitAll

将指定Channel中的所有数据发送到flow中并关闭

private fun initChannelData(){
        viewModelScope.launch {
            for (i in 1..5){
                emitAllChannel.send("hello$i")
            }
        }
    }
 fun emitAllTest(){
        viewModelScope.launch {
            flow {
                emitAll(emitAllChannel)
            }.collect {
                Log.d(TAG, "emitAllTest:emitAll $it")
            }
        }
    }
输出:hello1,hello2,hello3,hello4,hello5
  • filter

过滤给定条件的数据,返回flow

fun filterTest(){
        viewModelScope.launch {
            flowOf("A","B","C","D","b","c")
                .filter {
                    it == "A"
                }.collect {
                    Log.d(TAG, "filterTest:filter $it")
                }
        }
    }
输出:A
  • filterIsInstance

过滤给定类型的数据,返回flow

fun filterIsInstanceTest() {
        viewModelScope.launch {
            flowOf(1,"2",3,"4")
                .filterIsInstance<String>()
                .collect {
                    Log.d(TAG, "filterIsInstanceTest:filterIsInstance $it")
                }
        }
    }
其中String是给定的类型
输出:2,4
  • filterNot

查找不符合条件的所有数据。返回flow,和filter相反

fun filterNotText(){
        viewModelScope.launch {
            flowOf(1,2,3,4).filterNot {
                it == 2
            }.collect {
                Log.d(TAG, "filterNotText:filterNot $it")
            }
        }
    }
输出:1,2,3
  • filterNotNull

过滤掉null值,返回flow

fun filterNotNullTest(){
        viewModelScope.launch {
            flowOf(1,null,3,"hello")
                .filterNotNull()
                .collect {
                Log.d(TAG, "filterNotNullTest:filterNotNull $it")
            }
        }
    }
输出:1,3,hello
  • first

返回flow中的第一个元素,并且结束flow的终端运算符,如果是空就会抛出异常

fun firstTest(){
        viewModelScope.launch {
            val result = flowOf(1, 2, 3, 3)
                .first()
            Log.d(TAG, "firstTest:first $result")
        }
    }
输出:1
  • firstOrNull

和first一样,firstOrNull可以为空

  • flatMapConcat

这是一个组合操作符,相当于 map + flattenConcat, 通过 map 转成一个流,在通过 flattenConcat展开合并成一个流

  • flatMapLatest

和其他 带 Latest的操作符 一样,如果下个值来了,上变换还没结束,就取消掉。

fun flatMapLatestTest(){
        viewModelScope.launch {
            flow {
                emit("a")
                delay(100)
                emit("b")
            }.flatMapLatest {
                flow {
                    emit(it)
                    delay(200)
                    emit(it+"_last")
                }
            }.collect {
                Log.d(TAG, "flatMapLatestTest:flatMapLatest $it")
            }
        }
    }
输出:a,b,b_last
  • flatMapMerge

该运算符连续调用transforms,然后合并结果流,并对并发收集的流的数量进行并发限制,这是map(transform).flattMerge(concurrency)的快捷方式

 fun flatMapMergeText(){
        viewModelScope.launch {
            flowOf(1,2,3,4,5)
                .flatMapMerge(3){
                    flow {
                        emit(it)
                    }
                }.collect {
                    Log.d(TAG, "flatMapMergeText:flatMapMerge $it")
                }
        }
    }
输出:1,2,3,4,5
  • flattenConcat

按顺序将给定的flow展开为单个flow

fun flattenConcatTest(){
        viewModelScope.launch {
            flow {
                emit(flowOf(1,2,3))
                emit(flowOf(4,5,6))
            }.flattenConcat().collect {
                Log.d(TAG, "flattenConcatTest:flattenConcat $it")
            }
        }
    }
输出:1,2,3,4,5,6
  • flattenMerge

和flattenConcat功能类似,参数是限制并发数量,concurrency=1时和flattenConcat一样,大于1时并发收集

fun flattenMergeTest(){
        viewModelScope.launch {
            flow {
                emit(flowOf(1,2,3).flowOn(Dispatchers.IO))
                emit(flowOf(4,5,6).flowOn(Dispatchers.IO))
                emit(flowOf(7,8,9).flowOn(Dispatchers.IO))
            }.flattenMerge(3).collect {
                Log.d(TAG, "flattenMergeTest:flattenMerge $it")
            }
        }
    }
输出:1,2,3,7,8,9,4,5,6输出的顺序不固定
  • fold

从初始值开始遍历根据给定规则变换值,并将结果作为下次执行的参数
官方翻译是累加值

fun foldTest(){
        viewModelScope.launch {
            val result = flowOf(2,3,4)
                .fold(1) { beforeResult, flowValue ->
                    beforeResult * flowValue
                }
            Log.d(TAG, "foldTest:fold $result")
        }
    }
输出:24,计算步骤1*2=2,2*3=6,6*4=24
  • reduce

和fold差不多,无初始值

  • last

获取flow最后一个元素,如果flow为空抛出NoSuchElementException

  • lastOrNull

返回流发出的最后一个元素的终端运算符,如果流为空,则返回null

  • launchIn

launchIn()是viewModelScope.launch{flow.collect()}的简写,launchIn()指定flow收集操作的作用域

fun launchInTest(){
        viewModelScope.launch {
            withContext(Dispatchers.IO){
                //IO线程
                flow {
                    emit(1)
                }.onEach {
                    Log.d(TAG, "launchInTest:launchIn ${Thread.currentThread().name}")
                }.launchIn(viewModelScope)//指定作用域执行
            }
        }
    }
输出:main,是主线程,flow整体在IO线程里面launchIn指定了viewModelScope作用域
  • map

将flow发出的值根据给定规则转换然后发出,lambda的返回值是最终发出的值

fun mapTest(){
        viewModelScope.launch {
            flow {
                emit(1)
                emit(4)
            }.map {
                it * 2
            }.collect {
                Log.d(TAG, "mapTest:map $it")
            }
        }
    }
输出:2,8
  • mapLatest

和map类似,区别在于,当发送新当值当时候如果上次的转换还没结束将会取消上次的转换

fun mapLatest(){
        viewModelScope.launch {
            flow {
                emit(1)
                delay(100)
                emit(4)
            }.mapLatest {
                delay(200)
                it * 2
            }.collect {
                Log.d(TAG, "mapLatest: $it")
            }
        }
    }
输出:8,忽略了第一次的转换
  • mapNotNull

和map类似,区别在于mapNotNull过滤掉空值

  • merge

将多个flow合并成一个flow,不保留元素的顺序

fun mergeTest() {
        viewModelScope.launch {
            val oneFlow = flowOf(1, 2)
            val twoFlow = flowOf("a", "b")
            val threeFlow = flowOf("x", "y")
            listOf(oneFlow, twoFlow, threeFlow).merge().collect {
                Log.d(TAG, "mergeTest: $it")
            }
        }
    }
输出:1,2,a,b,x,y
  • onCompletion

当取消或结束的时候调用
相当与把flow包装到异常捕获的finally块中,一定会执行

fun onCompletionTest(){
        viewModelScope.launch {
            flow {
                emit(1)
                emit(2)
                delay(300)
                emit(3)
            }.onCompletion {
                Log.d(TAG, "onCompletionTest: 完成")
            }.collect {
                Log.d(TAG, "onCompletionTest: $it")
            }
        }
    }
输出:1,2,3,完成
  • onEach

上游数据向下游发送之前调用

fun onEachTest(){
        viewModelScope.launch {
            flow {
                emit(1)
                emit(2)
                delay(300)
                emit(3)
            }.onEach {
                Log.d(TAG, "onCompletionTest: $it")
            }.collect()
        }
    }
输出:1,2,3
  • onEmpty

当流完成却没有发出任何元素时回调。 可以用来兜底。

fun onEmptyTest() {
        viewModelScope.launch {
            emptyFlow<Int>()
                .onEmpty {
                    Log.d(TAG, "onEmptyTest: onEmpty")
                    emit(1)
                }.collect {
                    Log.d(TAG, "onEmptyTest: $it")
                }
        }
    }
输出:onEmpty,1
  • onStart

当上游开始发送数据之前调用

fun onStartTest(){
        viewModelScope.launch {
            flow {
                emit("a")
                emit("b")
            }.onStart {
                emit("start")
            }.collect{
                Log.d(TAG, "onStartTest: $it")
            }
        }
    }
输出:start,a,b
  • produceIn

将flow转换成ReceiveChannel

fun produceInTest(){
        viewModelScope.launch {
            flowOf(1,2,3).produceIn(this).consumeEach {
                Log.d(TAG, "produceInTest: $it")
            }
        }
    }
输出:1,2,3
  • retry

重试机制,当flow发生异常时可以重试

fun retryTest() {
        viewModelScope.launch {
            flow<Int> {
                Log.d(TAG, "produceInTest: 主动发出异常")
                throw IOException()
            }.retry(3){ e ->
                //筛选什么情况下重试,true重试,false不重试,默认是true
                val b = e is IOException
                b
            }.catch {
                Log.d(TAG, "produceInTest: 扑捕获异常")
            }.collect()
        }
    }
输出:主动发出异常,主动发出异常,扑捕获异常
  • retryWhen

有条件的进行重试

fun retryWhenTest() {
        viewModelScope.launch {
            flow<Int> {
                Log.d(TAG, "retryWhenTest: 主动发出异常")
                throw IOException()
            }.retryWhen { cause, attempt ->
                //attempt重试的次数,从0开始
                Log.d(TAG, "retryWhenTest: $attempt")
                if (attempt > 2) {
                    false
                } else {
                    cause is IOException
                }
            }.catch {
                Log.d(TAG, "retryWhenTest: 扑捕获异常")
            }.collect()
        }
    }
输出:主动发出异常,0,主动发出异常,1,主动发出异常,2,主动发出异常,3,扑捕获异常
  • runningFold

区别于 fold ,就是返回一个新流,将每步的结果发射出去。

  • runningReduce

区别于 reduce ,就是返回一个新流,将每步的结果发射出去。

  • sample

返回一个flow,在指定周期内,收集原始flow发出的最新值

fun sampleTest() {
        viewModelScope.launch {
            flow {
                repeat(10) {
                    emit(it)
                    delay(100)
                }
            }.sample(200).collect {
                Log.d(TAG, "sampleTest: $it")
            }
        }
    }
输出:1,3,5,7,9
  • scan

和 fold 相似,区别是fold 返回的是最终结果,scan返回的是个flow ,会把初始值和每一步的操作结果发送出去。

  • single

只收集一个值,如果为空或游多个值都会抛出异常

  • singleOrNull

接收流发送的第一个值 ,可以为空 ,发出多值的话除第一个,后面均被置为null

  • take

收集前n个数据

fun takeTest(){
        viewModelScope.launch {
            flow {
                emit(1)
                emit(2)
                emit(3)
            }.take(2).collect {
                Log.d(TAG, "takeTest: $it")
            }
        }
    }
输出:1,2
  • takeWhile

也是找第一个不满足条件的项,但是取其之前的值 ,和dropWhile 相反。

fun takeWhileTest(){
        viewModelScope.launch {
            flow {
                emit(1)
                emit(2)
                emit(3)
                emit(4)
            } .takeWhile { it <3  } .collect {
                Log.d(TAG, "takeTest: $it")
            }
        }
    }
输出:1,2
  • toCollection

将flow收集到集合中

 fun toCollectionTest(){
        val array = arrayListOf<Int>()
        viewModelScope.launch {
            flow {
                emit(1)
                emit(2)
                emit(3)
                emit(4)
            }.toCollection(array)
        }
        array.forEach {
            Log.d(TAG, "toCollectionTest: $it")
        }
    }
输出:1,2,3,4
  • toList

和toCollection类似,将flow结果收集到list中,参数可以指定一个list

fun toListTest(){
        viewModelScope.launch {
            flow {
                emit(1)
                emit(2)
                emit(3)
                emit(4)
            }.toList().forEach {
                Log.d(TAG, "toListTest: $it")
            }
        }
    }
输出:1,2,3,4
  • toSet

将flow结果收集到set中

  • transform

转换flow发出的值

fun transformTest(){
        viewModelScope.launch {
            flow {
                emit(1)
                emit(2)
            }.transform{ value ->
                if (value == 1){
                    emit("One$value")
                }
                emit("other$value")
            }.collect {
                Log.d(TAG, "transformTest: $it")
            }
        }
    }
输出:One1,other1,other2
  • transformLatest

和transform类似,区别在于发出新值的时候,上次的转换还没结束就会取消

  • transformWhile

transformWhile转换函数中最后一行如果是true则应用转换函数,如果是false将不应用

  • withIndex

将flow结果包装成带有索引的flow,索引从0开始

fun withIndexTest(){
        viewModelScope.launch {
            flow {
                emit("a")
                emit("b")
            } .withIndex().collect {
                Log.d(TAG, "withIndexTest: ${it.index},${it.value}")
            }
        }
    }
输出:(0,a), (1,b)
  • zip

对两个flow进行组合,分别从二者取值,一旦一个流结束了,那整个过程就结束了

 fun zipTest(){
        viewModelScope.launch {
            val flow = flowOf(1, 2, 3).onEach { delay(10) }
            val flow2 = flowOf("a", "b", "c", "d").onEach { delay(15) }
            flow.zip(flow2) { i, s -> i.toString() + s }.collect {
                Log.d(TAG, "zipTest: $it")
            }
        }
    }
输出:1a,2b,3c
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,907评论 6 506
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,987评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,298评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,586评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,633评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,488评论 1 302
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,275评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,176评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,619评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,819评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,932评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,655评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,265评论 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,871评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,994评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,095评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,884评论 2 354

推荐阅读更多精彩内容