从 Flow exception transparency is violated 开始对 Flow 实现进行分析

前两天在线上发现了这样的一个异常:

IllegalStateException
Flow exception transparency is violated:
    Previous 'emit' call has thrown exception ${exception.e}, but then emission attempt of value '$value' has been detected.
    Emissions from 'catch' blocks are prohibited in order to avoid unspecified behaviour, 'Flow.catch' operator can be used instead.
    For a more detailed explanation, please refer to Flow documentation.

然后我就全局搜了一下这个错误,发现是在下面的这个方法中抛出来的:

kotlinx.coroutines.flow.internal.SafeCollector#exceptionTransparencyViolated

private fun exceptionTransparencyViolated(exception: DownstreamExceptionElement, value: Any?) {
    /*
     * Exception transparency ensures that if a `collect` block or any intermediate operator
     * throws an exception, then no more values will be received by it.
     * For example, the following code:
     * ```
     * val flow = flow {
     *     emit(1)
     *     try {
     *          emit(2)
     *     } catch (e: Exception) {
     *          emit(3)
     *     }
     * }
     * // Collector
     * flow.collect { value ->
     *     if (value == 2) {
     *         throw CancellationException("No more elements required, received enough")
     *     } else {
     *         println("Collected $value")
     *     }
     * }
     * ```
     * is expected to print "Collected 1" and then "No more elements required, received enough" exception,
     * but if exception transparency wasn't enforced, "Collected 1" and "Collected 3" would be printed instead.
     */
    error("""
        Flow exception transparency is violated:
            Previous 'emit' call has thrown exception ${exception.e}, but then emission attempt of value '$value' has been detected.
            Emissions from 'catch' blocks are prohibited in order to avoid unspecified behaviour, 'Flow.catch' operator can be used instead.
            For a more detailed explanation, please refer to Flow documentation.
        """.trimIndent())

什么情况下才会有上面这样的错误呢?

我们把上面注释中的代码 copy 出来运行一下:

suspend fun main() {
    val flow = flow {
        emit(1)
        try {
            emit(2)
        } catch (e: Exception) {
            emit(3)
        }
    }
    // Collector
    flow.collect { value ->
        if (value == 2) {
            throw CancellationException("No more elements required, received enough")
        } else {
            println("Collected $value")
        }
    }
}

问题复现了。那这个问题是说什么呢?翻译一下就是:

Flow 流违反了异常透明度:

前一个 “emit” 调用已经抛出了一个 exception,但是在这之后检测到还在发射 $value。为了避免未指定的行为,禁止从 catch 块中发射数据,可以使用 Flow.catch 来替代。

上面的话有些难理解,通俗来讲就是禁止在 try { emit(xx) } catch 的 catch 块中发射数据,这样的话会违反 Flow 异常的透明性。

下面我们来分析一下 Flow 的代码来看看为什么会违反 Flow 异常的透明。

flow 方法的代码如下:

public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)

注意这里的 block 函数是我们的 lamble 表达式,也就是我们调用 emit 发射数据的函数,它是 FlowCollector<T>.() -> Unit 类型的扩展函数。

flow 函数返回了一个 SafeFlow 传入了 block 函数。

接下来我们来看看 collect 的函数:

public interface Flow<out T> {
    public suspend fun collect(collector: FlowCollector<T>)
}

collect 函数定义在 Flow 接口中。这里我们调用 collect 函数传入的是类型为 FlowCollector<T> 的函数式接口。上面调用 flow 返回的是 SafeFlow,所以 collect 函数的实现肯定也在 SafeFlow 这个类中。我们来看看 SafeFlow 的实现。

private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() {
    override suspend fun collectSafely(collector: FlowCollector<T>) {
        collector.block()
    }
}

public abstract class AbstractFlow<T> : Flow<T>, CancellableFlow<T> {

    public final override suspend fun collect(collector: FlowCollector<T>) {
        val safeCollector = SafeCollector(collector, coroutineContext)
        try {
            collectSafely(safeCollector)
        } finally {
            safeCollector.releaseIntercepted()
        }
    }
    // ...
}

SafeFlow 继承自 AbstractFlow,我们这里调用的是 AbstractFlowcollect 方法,这里创建了一个 SafeCollector ,然后调用了 collectorSafely 方法,在 SafeFlow 的 collectSafely 方法中又调用了 block 方法,注意这里的 block 就是调用 flow 函数传入的 lambda 代表的扩展方法。(没想到吧,还能这么玩)

所以我们调用的 emit 发射方法的实现是在 SafeCollector 中。如下:

override suspend fun emit(value: T) {
    return suspendCoroutineUninterceptedOrReturn sc@{ uCont ->
        try {
            emit(uCont, value)
        } catch (e: Throwable) {
            // Save the fact that exception from emit (or even check context) has been thrown
            lastEmissionContext = DownstreamExceptionElement(e)
            throw e
        }
    }
}

emit 是一个 suspend 的方法,通过调用 suspendCoroutineUninterceptedOrReturn 方法获取到当前 suspend 函数的 Continuation ,然后调用了非 suspend 的 emit 方法。注意这里的 try-catch,如果调用非 emit 的方法出现了异常,将会把异常赋值给 lastEmissionContext,然后抛出当前捕获的异常。接下来我们来看看 非 suspendemit 方法。

private fun emit(uCont: Continuation<Unit>, value: T): Any? {
    val currentContext = uCont.context // 当前的协程的 context
    currentContext.ensureActive() // 确保当前协程处于 active 状态(也就是没有取消,或者发生错误等)
    // This check is triggered once per flow on happy path.
    val previousContext = lastEmissionContext // 前一次的协程 context,如果前面 emit 发生了异常,那么这里的 previouseContext 将会是 DownstreamExceptionElement
    if (previousContext !== currentContext) {
        checkContext(currentContext, previousContext, value) // 检查
        lastEmissionContext = currentContext
    }
    completion = uCont
    val result = emitFun(collector as FlowCollector<Any?>, value, this as Continuation<Unit>)
    /*
     * If the callee hasn't suspended, that means that it won't (it's forbidden) call 'resumeWith` (-> `invokeSuspend`)
     * and we don't have to retain a strong reference to it to avoid memory leaks.
     */
    if (result != COROUTINE_SUSPENDED) {
        completion = null
    }
    return result
}

private fun checkContext(
    currentContext: CoroutineContext,
    previousContext: CoroutineContext?,
    value: T
) {
    if (previousContext is DownstreamExceptionContext) {
        exceptionTransparencyViolated(previousContext, value)
    }
    checkContext(currentContext)
}

在方法的一开始获取了当前协程的 Context,然后将 lastEmissionContext赋值给 previouseContext 。如果当前的 context 的引用不等于前一个 context 的引用(注意这里用的是 !==),那么就会调用 checkContext 来进行检查,在检查完成后会把 lastEmissionContext 赋值未当前的 currentContext

checkContext 中,如果 previousContextDownstreamExceptionContext 那么就会调用 exceptionTransparencyViolated 来抛出我们最开始提到的异常了。

private fun exceptionTransparencyViolated(exception: DownstreamExceptionContext, value: Any?) {
    /*
     * Exception transparency ensures that if a `collect` block or any intermediate operator
     * throws an exception, then no more values will be received by it.
     * For example, the following code:
     * ```
     * val flow = flow {
     *     emit(1)
     *     try {
     *          emit(2)
     *     } catch (e: Exception) {
     *          emit(3)
     *     }
     * }
     * // Collector
     * flow.collect { value ->
     *     if (value == 2) {
     *         throw CancellationException("No more elements required, received enough")
     *     } else {
     *         println("Collected $value")
     *     }
     * }
     * ```
     * is expected to print "Collected 1" and then "No more elements required, received enough" exception,
     * but if exception transparency wasn't enforced, "Collected 1" and "Collected 3" would be printed instead.
     */
    error("""
        Flow exception transparency is violated:
            Previous 'emit' call has thrown exception ${exception.e}, but then emission attempt of value '$value' has been detected.
            Emissions from 'catch' blocks are prohibited in order to avoid unspecified behaviour, 'Flow.catch' operator can be used instead.
            For a more detailed explanation, please refer to Flow documentation.
        """.trimIndent())

那么什么情况下会触发这个异常呢?我们可以看到官方的注释中给出了一个例子。从上面的分析中我们也可以得出结论,如果我们对 emit 方法进行 try-catch,并且在 catch 中调用 emit 发射数据那么就会抛出上面的异常了。

思考一下:为什么要抛出这样的异常?

我们来继续分析,等分析完后,你可能就会有答案了。

检查没问题的话,接下来调用了 emitFun

@Suppress("UNCHECKED_CAST")
private val emitFun =
    FlowCollector<Any?>::emit as Function3<FlowCollector<Any?>, Any?, Continuation<Unit>, Any?>

public fun interface FlowCollector<in T> {

    /**
     * Collects the value emitted by the upstream.
     * This method is not thread-safe and should not be invoked concurrently.
     */
    public suspend fun emit(value: T)
}

这里对 Kotlin 和 Java 理解不够深的话,理解起来还是比较困难的。我们逐步来拆解。emitFun 是一个变量(这里不能说是一个函数类型的变量,因为它的类型不是一个高级函数),它的类型是 Function3<FlowCollector<Any?>, Any?, Continuation<Unit>, Any?>。那么为什么能把 FlowCollector<Any?>::emit 转换为 Function3 呢?

首先从 Jvm 字节码的角度出发,Follectoremit 就是一个方法,emit 的参数列表中的第一个参数 this 就是 Follector,最后一个参数是 Kotlin 协程 suspend 函数添加的 Continuation,所以这里可以转换为 Funcation3

需要注意的,这里的 collector 就是我们调用 collect 时传入的 lambda 表达式所对应的对象,所以这里相当于调用了它的 emit 函数。可能还是有点难以理解,我们把上面的程序中的 lambda 表达式全部替换成匿名内部类,再来理解一下。

suspend fun main() {
    val flow = flow {
        emit(1)
        try {
            emit(2)
        } catch (e: Exception) {
            emit(3)
        }
    }
    // Collector
    flow.collect(object : FlowCollector<Int> {
        override suspend fun emit(value: Int) {
            if (value == 2) {
                throw CancellationException("No more elements required, received enough")
            } else {
                println("Collected $value")
            }
        }
    })
}

所以我们在 flow 中传入的 lambda 表达式中调用 emit 方法是 SafeCollectoremit 方法,这个 emit 方法,最终会调用到我们调用 collect 传入的 FlowCollectoremit 方法。

那么这里就会有一个问题,我们在 flow 上游发射数据,如果对 emit 方法加 try-catch,那么可能会 catch 到下游的异常,这违反了 flow 异常的透明性。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容