Kotlin Flow啊,你将流向何方?

前言

前边一系列的协程文章铺垫了很久,终于要分析Flow了。如果说协程是Kotlin的精华,那么Flow就是协程的精髓。
通过本篇文章,你将了解到:

  1. 什么是流?
  2. 为什么引进Flow?
  3. Fow常见的操作
  4. 为什么说Flow是冷流

1. 什么是流

image.png

自然界的流水,从高到低,从上游到下游流动。

而对于计算机世界的流:

数据的传递过程构成了数据流,简称流

比如想要查找1~1000内的偶数,可以这么写:

    var i = 0
    var list = mutableListOf<Int>()
    while (i < 1000) {
        if (i % 2 == 0)
            list.add(i)
        i++
    }

此处对数据的处理即为找出其中的偶数。
若想要在偶数中找到>500的数,则继续筛选:

    var i = 0
    var list = mutableListOf<Int>()
    while (i < 1000) {
        if (i > 500 && i % 2 == 0)
            list.add(i)
        i++
    }

可以看出,原始数据是1~1000,我们对它进行了一些操作:过滤偶数、过滤>500的数。当然还可以进行其它操作,如映射、变换等。
提取上述过程三要素:

  1. 原始数据
  2. 对数据的一系列操作
  3. 最终的数据

把这一系列的过程当做流:


image.png

从流的方向来观察,我们称原始数据为上流,对数据进行一系列处理后,最终的数据为下流。
从流的属性来观察,我们认为生产者在上流生产数据,消费者在下流消费数据。

2. 为什么引进Flow?

由前面的文章我们知道,Java8提供了StreamAPI,专用来操作流,而Kotlin也提供了Sequence来处理流。
那为什么还要引进Flow呢?
在Kotlin的世界里当然不会想再依赖Java的StreamAPI了,主要来对比Kotlin里的各种方案选择。
先看应用场景的演变。

a、集合获取多个值
想要获取多个值,很显而易见的想到了集合。

    fun testList() {
        //构造集合
        fun list(): List<Int> = listOf(1, 2, 3)
        list().forEach {
            //获取多个值
            println("value = $it")
        }
    }

以上函数功能涉及两个对象:生产者和消费者。
生产者:负责将1、2、3构造为集合。
消费者:负责从集合里将1、2、3取出。
若此时想要控制生产者的速度,比如先将1放到集合里,过1秒后再讲2放进集合,在此种场景下该函数显得不那么灵活了。

b、Sequence控制生成速度
Sequence可以生产数据,先看看它是怎么控制生产速度的。

    fun testSequence() {
        fun sequence():Sequence<Int> = sequence {
            for (i in 1..3) {
                Thread.sleep(1000)
                yield(i)
            }
        }
        sequence().forEach {
            println("value = $it")
        }
    }

通过阻塞线程控制了生产者的速度。
你可能会说:在协程体里为啥要用Thread.sleep()阻塞线程呢,用delay()不香吗?
看起来很香,我们来看看实际效果:

image.png

直接报编译错误了,提示是:受限制的挂起函数只能调用自己协程作用域内的成员和其它挂起函数。
而sequence的作用域是SequenceScope,查看其定义发现:
image.png

究其原因,SequenceScope 被RestrictsSuspension 修饰限制了。

c、集合配合协程使用
sequence 因为协程作用域的限制,不能异步生产数据,而使用集合却没此限制。

    suspend fun testListDelay() {
        suspend fun list():List<Int> {
            delay(1000)
            return listOf(1, 2, 3)
        }
        list().forEach {
            println("value = $it")
        }
    }

但也暴露了一个缺陷,只能一次性的返回集合元素。

综上所述:

不管是集合还是Sequence,都不能完全覆盖流的需求,此时Flow闪亮登场了

3. Fow常见的操作

最简单的Flow使用

    suspend fun testFlow1() {
        //生产者
        var flow = flow {
            //发射数据
            emit(5)
        }

        //消费者
        flow.collect {
            println("value=$it")
        }
    }

通过flow函数构造一个flow对象,然后通过调用flow.collect收集数据。
flow函数的闭包为生产者的生产逻辑,collect函数的闭包为消费者的消费逻辑。

当然,还有更简单的写法:

    suspend fun testFlow2() {
        //生产者
        flow {
            //发射数据
            emit(5)
        }.collect {
            //消费者
            println("value=$it")
        }
    }

执行流程:


image.png

Flow操作符

上面只提到了flow数据的发送以及接收,并没有提及对flow数据的操作。
flow提供了许多操作符方便我们对数据进行处理(对流进行加工)。
我们以寻找1~1000内大于500的偶数为例:

    suspend fun testFlow3() {
        //生产者
        var flow = flow {
            for (i in 1..1000) {
                emit(i)
            }
        }.filter { it > 500 && it % 2 == 0 }

        //消费者
        flow.collect {
            println("value=$it")
        }
    }

filter函数的作用根据一定的规则过滤数据,一般称这种函数为flow的操作符。
当然还可以对flow进行映射、变换、异常处理等。

    suspend fun testFlow3() {
        //生产者
        var flow = flow {
            for (i in 1..1000) {
                emit(i)
            }
        }.filter { it > 500 && it % 2 == 0 }
            .map { it - 500 }
            .catch {
                //异常处理
            }

        //消费者
        flow.collect {
            println("value=$it")
        }
    }



中间操作符
前面说过流的三要素:原始数据、对数据的操作、最终数据,对应到Flow上也是一样的。
flow的闭包里我们看做是原始数据,而filter、map、catch等看做是对数据的操作,collect闭包里看做是最终的数据。
filter、map等操作符属于中间操作符,它们负责对数据进行处理。

中间操作符仅仅只是预先定义一些对流的操作方式,并不会主动触发动作执行

末端操作符
末端操作符也叫做终端操作符,调用末端操作符后,Flow将从上流发出数据,经过一些列中间操作符处理后,最后流到下流形成最终数据。
如上面的collect操作符就是其中一种末端操作符。

怎么区分中间操作符和末端操作符呢?
和Sequence操作符类似,可以通过返回值判断。
先看看中间操作符filter:

public inline fun <T> Flow<T>.filter(crossinline predicate: suspend (T) -> Boolean): Flow<T> = transform { value ->
    if (predicate(value)) return@transform emit(value)
}

internal inline fun <T, R> Flow<T>.unsafeTransform(
    @BuilderInference crossinline transform: suspend FlowCollector<R>.(value: T) -> Unit
): Flow<R> = unsafeFlow { // Note: unsafe flow is used here, because unsafeTransform is only for internal use
    collect { value ->
        // kludge, without it Unit will be returned and TCE won't kick in, KT-28938
        return@collect transform(value)
    }
}

可以看出,filter操作符仅仅只是构造了Flow对象,并重写了collect函数。

再看末端操作符collect:

public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value: T) -> Unit): Unit =
    collect(object : FlowCollector<T> {
        override suspend fun emit(value: T) = action(value)
    })

返回值为Unit,并且通过调用collect最终调用了emit,触发了流。

Flow相比Sequence、Collection的优势

Sequence对于协程的支持不够好,不能调用其作用域外的suspend函数,而Collection生产数据不够灵活,来看看Flow是如何解决这些问题的。

    suspend fun testFlow4() {
        //生产者
        var flow = flow {
            for (i in 1..1000) {
                delay(1000)
                emit(i)
            }
        }.flowOn(Dispatchers.IO)//切换到io线程执行
        
        //消费者
        flow.collect {
            delay(1000)
            println("value=$it")
        }
    }

如上,flow的生产者、消费者闭包里都支持调用协程的suspend函数,同时也支持切换线程执行。
再者,flow可以将集合里的值一个个发出,可调整其流速。
当然,flow还提供了许多操作符帮助我们实现各种各样的功能,此处限于篇幅就不再深入。
万变不离其宗,知道了原理,一切迎刃而解。

4. 为什么说Flow是冷流

flow 的流动

在sequence的分析里有提到过sequence是冷流,那么什么是冷流呢?

没有消费者,生产者不会生产数据
没有观察者,被观察者不会发送数据

    suspend fun testFlow5() {
        //生产者
        var flow = flow {
            println("111")
            for (i in 1..1000) {
                emit(i)
            }
        }.filter {
            println("222")
            it > 500 && it % 2 == 0
        }.map {
            println("333")
            it - 500
        }.catch {
            println("444")
            //异常处理
        }

如上代码,只要生产者没有消费者,该函数运行后不会有任何打印语句输出。
这个时候将消费者加上,就会触发流的流动。

还是以最简单的flow demo为例,看看其调用流程:


image.png

图上1~6步骤即为最简单的flow调用流程。
可以看出,只有调用了末端操作符(如collect)之后才会触发flow的流动,因此flow是冷流。

flow 的原理

    suspend fun testFlow1() {
        //生产者
        var flow = flow {
            //发射数据
            emit(5)
        }

        //消费者
        flow.collect {
            println("value=$it")
        }
    }

以上代码涉及到三个关键函数(flow、emit、collect),两个闭包(flow闭包、collect闭包。
从上面的调用图可知,以上五者的调用关系:

flow-->collect-->flow闭包-->emit-->collect闭包

接下来逐一分析在代码里的关系。

先看生产者动作(flow函数)
flow函数实现:


public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)

传入的参数类型为:FlowCollector的扩展函数,而FlowCollector是接口,它有唯一的函数:emit(xx)。因此在flow函数的闭包里可以调用emit(xx)函数,flow闭包作为SafeFlow的成员变量block。
flow 函数返回SafeFlow,SafeFlow继承自AbstractFlow,并实现了collect函数:

#Flow.kt
    public final override suspend fun collect(SafeCollector: FlowCollector<T>) {
        //构造SafeCollector
        //collector 作为SafeCollector的成员变量
        val safeCollector = SafeCollector(collector, coroutineContext)
        try {
            //抽象函数,子类实现
            collectSafely(safeCollector)
        } finally {
            safeCollector.releaseIntercepted()
        }
    }

collect的闭包作为SafeCollector的成员变量collector,后面会用到。
由此可见:flow函数仅仅只是构造了flow对象并返回。

再看消费者动作(collect)
当消费者调用flow.collect函数时:

public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value: T) -> Unit): Unit =
    collect(object : FlowCollector<T> {
        override suspend fun emit(value: T) = action(value)
    })

此时调用的collect即为flow里定义的collect函数,并构造了匿名对象FlowCollector,实现了emit函数,而emit函数的真正实现为action,也就是外层传入的collect的闭包。

上面分析到的collect源码里调用了collectSafely:

    private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() {
        override suspend fun collectSafely(collector: FlowCollector<T>) {
            collector.block()
        }
    }

此处的block即为在构造flow对象时传入的闭包。
此时,消费者通过collect函数已经调用到生产者的闭包里

还剩下最后一个问题:生产者的闭包是如何流转到消费者的闭包里呢?

最后看发射动作(emit)
在生产者的闭包里调用了emit函数:

    override suspend fun emit(value: T) {
        //挂起函数
        return suspendCoroutineUninterceptedOrReturn sc@{ uCont ->
            try {
                //uCont为当前协程续体
                emit(uCont, value)
            } catch (e: Throwable) {
                // Save the fact that exception from emit (or even check context) has been thrown
                lastEmissionContext = DownstreamExceptionElement(e)
                throw e
            }
        }
    }

    private fun emit(uCont: Continuation<Unit>, value: T): Any? {
        val currentContext = uCont.context
        currentContext.ensureActive()
        // This check is triggered once per flow on happy path.
        val previousContext = lastEmissionContext
        if (previousContext !== currentContext) {
            checkContext(currentContext, previousContext, value)
        }
        completion = uCont
        //collector.emit 最终调用collect的闭包
        return emitFun(collector as FlowCollector<Any?>, value, this as Continuation<Unit>)
    }

如此一来,生产者的闭包里调用emit函数后,将会调用到collect的闭包里,此时数据从flow的上游流转到下游。
总结以上步骤,其实本质还是对象调用。

中间操作符的原理
以filter为例:

    public inline fun <T> Flow<T>.filter(crossinline predicate: suspend (T) -> Boolean): Flow<T> = transform { value ->
        //判断过滤条件是否满足,若是则发送数据
        if (predicate(value)) return@transform emit(value)
    }

    internal inline fun <T, R> Flow<T>.unsafeTransform(
        @BuilderInference crossinline transform: suspend FlowCollector<R>.(value: T) -> Unit
    ): Flow<R> = unsafeFlow { // Note: unsafe flow is used here, because unsafeTransform is only for internal use
        //调用当前对象collect
        collect { value ->
            // kludge, without it Unit will be returned and TCE won't kick in, KT-28938
            return@collect transform(value)
        }
    }

    internal inline fun <T> unsafeFlow(@BuilderInference crossinline block: suspend FlowCollector<T>.() -> Unit): Flow<T> {
        //构造flow,重写collect
        return object : Flow<T> {
            override suspend fun collect(collector: FlowCollector<T>) {
                collector.block()
            }
        }
    }

filter操作符构造了新的flow对象,该对象重写了collect函数。
当调用flow.collect时,先调用到filter对象的collect,进而调用到原始flow的collect,接着调用到原始flow对象的闭包,在闭包里调用的emit即为filter的闭包,若filter闭包里条件满足则调动emit函数,最后调用到collect的闭包。


image.png

理解中间操作符的要点:

  1. 中间操作符返回新的flow对象,重写了collect函数
  2. collect函数会调用当前flow(调用filter的flow对象)的collect
  3. collect函数做其它的处理

与sequence类似,使用了装饰者模式。
以上以filter为例阐述了原理,其它中间操作符的原理类似,此处就不再细说。

下篇将分析Flow的背压与线程切换,相信分析的逻辑会让大家耳目一新,敬请期待~

本文基于Kotlin 1.5.3,文中完整Demo请点击

您若喜欢,请点赞、关注、收藏,您的鼓励是我前进的动力

持续更新中,和我一起步步为营系统、深入学习Android/Kotlin

1、Android各种Context的前世今生
2、Android DecorView 必知必会
3、Window/WindowManager 不可不知之事
4、View Measure/Layout/Draw 真明白了
5、Android事件分发全套服务
6、Android invalidate/postInvalidate/requestLayout 彻底厘清
7、Android Window 如何确定大小/onMeasure()多次执行原因
8、Android事件驱动Handler-Message-Looper解析
9、Android 键盘一招搞定
10、Android 各种坐标彻底明了
11、Android Activity/Window/View 的background
12、Android Activity创建到View的显示过
13、Android IPC 系列
14、Android 存储系列
15、Java 并发系列不再疑惑
16、Java 线程池系列
17、Android Jetpack 前置基础系列
18、Android Jetpack 易懂易学系列
19、Kotlin 轻松入门系列
20、Kotlin 协程系列全面解读

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,125评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,293评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,054评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,077评论 1 291
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,096评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,062评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,988评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,817评论 0 273
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,266评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,486评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,646评论 1 347
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,375评论 5 342
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,974评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,621评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,796评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,642评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,538评论 2 352

推荐阅读更多精彩内容