Flow源码阅读笔记

上篇看了Flow的基本用法,这篇文章就从源码的角度来看看Flow的运行机制

1.Flow创建
fun simpleFlow() = flow<Int> {  
    for (i in 1..3) {  
        delay(100)  
        emit(i)  
    }  
}

看一下flow函数的定义

public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)

参数类型为

@BuilderInference block: suspend FlowCollector<T>.() -> Unit

这里的参数,可以理解为 入参是一个函数,该函数是FlowCollector的一个扩展函数,没有入参,也没有出参(返回值为Unit,相当于java的void)。对于这块不理解的,可以参阅 这里

flow函数调用了 SafeFlow的构造函数

private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() {  
    override suspend fun collectSafely(collector: FlowCollector<T>) {  
        collector.block()  
    }  
}

AbstractFlow 代码也比较简单,稍后再说

到这里,Flow创建圆满结束了

2.接收 collect 函数

前面介绍过,Flow为冷流,冷流不会发射数据,只有到了收集(末端操作符)的时候,数据才开始生产并被发射出去。接下来就来看看emit和collect怎么发生的关联。先来看一下collect函数

public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value: T) -> Unit): Unit =  
    collect(object : FlowCollector<T> {  
        override suspend fun emit(value: T) = action(value)  
    })

这里可以看出,FlowCollector的emit方法,实际上调用的是collect传入的action方法。但是,我们创建Flow的FlowCollector是如何与collect方法传入的FlowCollector产生关系的呢?

关键就在于SafeFlow这个类

private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() {  
    override suspend fun collectSafely(collector: FlowCollector<T>) {  
       collector.block()  
    }  
}

AbstractFlow代码如下

public abstract class AbstractFlow<T> : Flow<T>, CancellableFlow<T> {
    @InternalCoroutinesApi  
    public final override suspend fun collect(collector: FlowCollector<T>) {  
        val safeCollector = SafeCollector(collector, coroutineContext)  
        try {  
            collectSafely(safeCollector)  
        } finally {  
            safeCollector.releaseIntercepted()  
        }  
    }  

    /**  
     * Accepts the given [collector] and [emits][FlowCollector.emit] values into it.     *     * A valid implementation of this method has the following constraints:     * 1) It should not change the coroutine context (e.g. with `withContext(Dispatchers.IO)`) when emitting values.     *    The emission should happen in the context of the [collect] call.     *    Please refer to the top-level [Flow] documentation for more details.     * 2) It should serialize calls to [emit][FlowCollector.emit] as [FlowCollector] implementations are not     *    thread-safe by default.     *    To automatically serialize emissions [channelFlow] builder can be used instead of [flow]     *     * @throws IllegalStateException if any of the invariants are violated.  
     */    
    public abstract suspend fun collectSafely(collector: FlowCollector<T>)  
}

到这里可以看出SafeFlow的collect方法,实际调用的是collectSafely方法,最终是collect生成的FlowCollector调用创建时传入的block方法。

有点绕,再捋一遍。

flow构造时,传入FlowCollector的扩展方法,我们称此方法为block

当collect方法调用时,传入参数action,首先将此action方法包装成FlowCollector,我们称之为safeCollector

而collect最终调用的为safeCollector.block

到此,我们就理解了,为什么Flow是冷流了,只有末端操作符才会调用其构造时的block

3.协程切换flowOn方法

直接看源码

public fun <T> Flow<T>.flowOn(context: CoroutineContext): Flow<T> {  
    checkFlowContext(context)  
    return when {  
        context == EmptyCoroutineContext -> this  
        this is FusibleFlow -> fuse(context = context)  
        else -> ChannelFlowOperatorImpl(this, context = context)  
    }  
}

这里的when方法比较有意思,没有参数。kotlin的when支持没有参数的条件跳转,无参时需要各种条件都是一个boolean型表达式, 参见这里

以ChannelFlowOperatorImpl为例来看一下

internal class ChannelFlowOperatorImpl<T>(  
flow: Flow<T>,  
context: CoroutineContext = EmptyCoroutineContext,  
capacity: Int = Channel.OPTIONAL_CHANNEL,  
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND  
) : ChannelFlowOperator<T, T>(flow, context, capacity, onBufferOverflow) {  

    override fun create(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow): ChannelFlow<T> =  
        ChannelFlowOperatorImpl(flow, context, capacity, onBufferOverflow)
    override fun dropChannelOperators(): Flow<T> = flow  

    override suspend fun flowCollect(collector: FlowCollector<T>) =  
        flow.collect(collector)  
}

这里没什么有价值的代码,由于ChannelFlowOperatorImpl继承自ChannelFlowOperator看一下ChannelFlowOperator的代码

internal abstract class ChannelFlowOperator<S, T>(
    @JvmField protected val flow: Flow<S>,
    context: CoroutineContext,
    capacity: Int,
    onBufferOverflow: BufferOverflow
) : ChannelFlow<T>(context, capacity, onBufferOverflow) {
    protected abstract suspend fun flowCollect(collector: FlowCollector<T>)

    // Changes collecting context upstream to the specified newContext, while collecting in the original context
    private suspend fun collectWithContextUndispatched(collector: FlowCollector<T>, newContext: CoroutineContext) {
        val originalContextCollector = collector.withUndispatchedContextCollector(coroutineContext)
        // invoke flowCollect(originalContextCollector) in the newContext
        return withContextUndispatched(newContext, block = { flowCollect(it) }, value = originalContextCollector)
    }

    // Slow path when output channel is required
    protected override suspend fun collectTo(scope: ProducerScope<T>) =
        flowCollect(SendingCollector(scope))

    // Optimizations for fast-path when channel creation is optional
    override suspend fun collect(collector: FlowCollector<T>) {
        // Fast-path: When channel creation is optional (flowOn/flowWith operators without buffer)
        if (capacity == Channel.OPTIONAL_CHANNEL) {
            val collectContext = coroutineContext
            val newContext = collectContext + context // compute resulting collect context
            // #1: If the resulting context happens to be the same as it was -- fallback to plain collect
            if (newContext == collectContext)
                return flowCollect(collector)
            // #2: If we don't need to change the dispatcher we can go without channels
            if (newContext[ContinuationInterceptor] == collectContext[ContinuationInterceptor])
                return collectWithContextUndispatched(collector, newContext)
        }
        // Slow-path: create the actual channel
        super.collect(collector)
    }

    // debug toString
    override fun toString(): String = "$flow -> ${super.toString()}"
}

collect执行的时候,如果指定的协程与现在的不一致,则走collectWithContextUndispatched方法,走到下面这个方法

internal suspend fun <T, V> withContextUndispatched(
    newContext: CoroutineContext,
    value: V,
    countOrElement: Any = threadContextElements(newContext), // can be precomputed for speed
    block: suspend (V) -> T
): T =
    suspendCoroutineUninterceptedOrReturn { uCont ->
        withCoroutineContext(newContext, countOrElement) {
            block.startCoroutineUninterceptedOrReturn(value, StackFrameContinuation(uCont, newContext))
        }
    }

withCoroutineContext这个方法就是协程切换的地方了。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,080评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,422评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,630评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,554评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,662评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,856评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,014评论 3 408
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,752评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,212评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,541评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,687评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,347评论 4 331
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,973评论 3 315
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,777评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,006评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,406评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,576评论 2 349

推荐阅读更多精彩内容