Flow源码阅读笔记

上篇看了Flow的基本用法,这篇文章就从源码的角度来看看Flow的运行机制

1.Flow创建
fun simpleFlow() = flow<Int> {  
    for (i in 1..3) {  
        delay(100)  
        emit(i)  
    }  
}

看一下flow函数的定义

public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)

参数类型为

@BuilderInference block: suspend FlowCollector<T>.() -> Unit

这里的参数,可以理解为 入参是一个函数,该函数是FlowCollector的一个扩展函数,没有入参,也没有出参(返回值为Unit,相当于java的void)。对于这块不理解的,可以参阅 这里

flow函数调用了 SafeFlow的构造函数

private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() {  
    override suspend fun collectSafely(collector: FlowCollector<T>) {  
        collector.block()  
    }  
}

AbstractFlow 代码也比较简单,稍后再说

到这里,Flow创建圆满结束了

2.接收 collect 函数

前面介绍过,Flow为冷流,冷流不会发射数据,只有到了收集(末端操作符)的时候,数据才开始生产并被发射出去。接下来就来看看emit和collect怎么发生的关联。先来看一下collect函数

public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value: T) -> Unit): Unit =  
    collect(object : FlowCollector<T> {  
        override suspend fun emit(value: T) = action(value)  
    })

这里可以看出,FlowCollector的emit方法,实际上调用的是collect传入的action方法。但是,我们创建Flow的FlowCollector是如何与collect方法传入的FlowCollector产生关系的呢?

关键就在于SafeFlow这个类

private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() {  
    override suspend fun collectSafely(collector: FlowCollector<T>) {  
       collector.block()  
    }  
}

AbstractFlow代码如下

public abstract class AbstractFlow<T> : Flow<T>, CancellableFlow<T> {
    @InternalCoroutinesApi  
    public final override suspend fun collect(collector: FlowCollector<T>) {  
        val safeCollector = SafeCollector(collector, coroutineContext)  
        try {  
            collectSafely(safeCollector)  
        } finally {  
            safeCollector.releaseIntercepted()  
        }  
    }  

    /**  
     * Accepts the given [collector] and [emits][FlowCollector.emit] values into it.     *     * A valid implementation of this method has the following constraints:     * 1) It should not change the coroutine context (e.g. with `withContext(Dispatchers.IO)`) when emitting values.     *    The emission should happen in the context of the [collect] call.     *    Please refer to the top-level [Flow] documentation for more details.     * 2) It should serialize calls to [emit][FlowCollector.emit] as [FlowCollector] implementations are not     *    thread-safe by default.     *    To automatically serialize emissions [channelFlow] builder can be used instead of [flow]     *     * @throws IllegalStateException if any of the invariants are violated.  
     */    
    public abstract suspend fun collectSafely(collector: FlowCollector<T>)  
}

到这里可以看出SafeFlow的collect方法,实际调用的是collectSafely方法,最终是collect生成的FlowCollector调用创建时传入的block方法。

有点绕,再捋一遍。

flow构造时,传入FlowCollector的扩展方法,我们称此方法为block

当collect方法调用时,传入参数action,首先将此action方法包装成FlowCollector,我们称之为safeCollector

而collect最终调用的为safeCollector.block

到此,我们就理解了,为什么Flow是冷流了,只有末端操作符才会调用其构造时的block

3.协程切换flowOn方法

直接看源码

public fun <T> Flow<T>.flowOn(context: CoroutineContext): Flow<T> {  
    checkFlowContext(context)  
    return when {  
        context == EmptyCoroutineContext -> this  
        this is FusibleFlow -> fuse(context = context)  
        else -> ChannelFlowOperatorImpl(this, context = context)  
    }  
}

这里的when方法比较有意思,没有参数。kotlin的when支持没有参数的条件跳转,无参时需要各种条件都是一个boolean型表达式, 参见这里

以ChannelFlowOperatorImpl为例来看一下

internal class ChannelFlowOperatorImpl<T>(  
flow: Flow<T>,  
context: CoroutineContext = EmptyCoroutineContext,  
capacity: Int = Channel.OPTIONAL_CHANNEL,  
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND  
) : ChannelFlowOperator<T, T>(flow, context, capacity, onBufferOverflow) {  

    override fun create(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow): ChannelFlow<T> =  
        ChannelFlowOperatorImpl(flow, context, capacity, onBufferOverflow)
    override fun dropChannelOperators(): Flow<T> = flow  

    override suspend fun flowCollect(collector: FlowCollector<T>) =  
        flow.collect(collector)  
}

这里没什么有价值的代码,由于ChannelFlowOperatorImpl继承自ChannelFlowOperator看一下ChannelFlowOperator的代码

internal abstract class ChannelFlowOperator<S, T>(
    @JvmField protected val flow: Flow<S>,
    context: CoroutineContext,
    capacity: Int,
    onBufferOverflow: BufferOverflow
) : ChannelFlow<T>(context, capacity, onBufferOverflow) {
    protected abstract suspend fun flowCollect(collector: FlowCollector<T>)

    // Changes collecting context upstream to the specified newContext, while collecting in the original context
    private suspend fun collectWithContextUndispatched(collector: FlowCollector<T>, newContext: CoroutineContext) {
        val originalContextCollector = collector.withUndispatchedContextCollector(coroutineContext)
        // invoke flowCollect(originalContextCollector) in the newContext
        return withContextUndispatched(newContext, block = { flowCollect(it) }, value = originalContextCollector)
    }

    // Slow path when output channel is required
    protected override suspend fun collectTo(scope: ProducerScope<T>) =
        flowCollect(SendingCollector(scope))

    // Optimizations for fast-path when channel creation is optional
    override suspend fun collect(collector: FlowCollector<T>) {
        // Fast-path: When channel creation is optional (flowOn/flowWith operators without buffer)
        if (capacity == Channel.OPTIONAL_CHANNEL) {
            val collectContext = coroutineContext
            val newContext = collectContext + context // compute resulting collect context
            // #1: If the resulting context happens to be the same as it was -- fallback to plain collect
            if (newContext == collectContext)
                return flowCollect(collector)
            // #2: If we don't need to change the dispatcher we can go without channels
            if (newContext[ContinuationInterceptor] == collectContext[ContinuationInterceptor])
                return collectWithContextUndispatched(collector, newContext)
        }
        // Slow-path: create the actual channel
        super.collect(collector)
    }

    // debug toString
    override fun toString(): String = "$flow -> ${super.toString()}"
}

collect执行的时候,如果指定的协程与现在的不一致,则走collectWithContextUndispatched方法,走到下面这个方法

internal suspend fun <T, V> withContextUndispatched(
    newContext: CoroutineContext,
    value: V,
    countOrElement: Any = threadContextElements(newContext), // can be precomputed for speed
    block: suspend (V) -> T
): T =
    suspendCoroutineUninterceptedOrReturn { uCont ->
        withCoroutineContext(newContext, countOrElement) {
            block.startCoroutineUninterceptedOrReturn(value, StackFrameContinuation(uCont, newContext))
        }
    }

withCoroutineContext这个方法就是协程切换的地方了。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容