从 Flow exception transparency is violated 开始对 Flow 实现进行分析

前两天在线上发现了这样的一个异常:

IllegalStateException
Flow exception transparency is violated:
    Previous 'emit' call has thrown exception ${exception.e}, but then emission attempt of value '$value' has been detected.
    Emissions from 'catch' blocks are prohibited in order to avoid unspecified behaviour, 'Flow.catch' operator can be used instead.
    For a more detailed explanation, please refer to Flow documentation.

然后我就全局搜了一下这个错误,发现是在下面的这个方法中抛出来的:

kotlinx.coroutines.flow.internal.SafeCollector#exceptionTransparencyViolated

private fun exceptionTransparencyViolated(exception: DownstreamExceptionElement, value: Any?) {
    /*
     * Exception transparency ensures that if a `collect` block or any intermediate operator
     * throws an exception, then no more values will be received by it.
     * For example, the following code:
     * ```
     * val flow = flow {
     *     emit(1)
     *     try {
     *          emit(2)
     *     } catch (e: Exception) {
     *          emit(3)
     *     }
     * }
     * // Collector
     * flow.collect { value ->
     *     if (value == 2) {
     *         throw CancellationException("No more elements required, received enough")
     *     } else {
     *         println("Collected $value")
     *     }
     * }
     * ```
     * is expected to print "Collected 1" and then "No more elements required, received enough" exception,
     * but if exception transparency wasn't enforced, "Collected 1" and "Collected 3" would be printed instead.
     */
    error("""
        Flow exception transparency is violated:
            Previous 'emit' call has thrown exception ${exception.e}, but then emission attempt of value '$value' has been detected.
            Emissions from 'catch' blocks are prohibited in order to avoid unspecified behaviour, 'Flow.catch' operator can be used instead.
            For a more detailed explanation, please refer to Flow documentation.
        """.trimIndent())

什么情况下才会有上面这样的错误呢?

我们把上面注释中的代码 copy 出来运行一下:

suspend fun main() {
    val flow = flow {
        emit(1)
        try {
            emit(2)
        } catch (e: Exception) {
            emit(3)
        }
    }
    // Collector
    flow.collect { value ->
        if (value == 2) {
            throw CancellationException("No more elements required, received enough")
        } else {
            println("Collected $value")
        }
    }
}

问题复现了。那这个问题是说什么呢?翻译一下就是:

Flow 流违反了异常透明度:

前一个 “emit” 调用已经抛出了一个 exception,但是在这之后检测到还在发射 $value。为了避免未指定的行为,禁止从 catch 块中发射数据,可以使用 Flow.catch 来替代。

上面的话有些难理解,通俗来讲就是禁止在 try { emit(xx) } catch 的 catch 块中发射数据,这样的话会违反 Flow 异常的透明性。

下面我们来分析一下 Flow 的代码来看看为什么会违反 Flow 异常的透明。

flow 方法的代码如下:

public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)

注意这里的 block 函数是我们的 lamble 表达式,也就是我们调用 emit 发射数据的函数,它是 FlowCollector<T>.() -> Unit 类型的扩展函数。

flow 函数返回了一个 SafeFlow 传入了 block 函数。

接下来我们来看看 collect 的函数:

public interface Flow<out T> {
    public suspend fun collect(collector: FlowCollector<T>)
}

collect 函数定义在 Flow 接口中。这里我们调用 collect 函数传入的是类型为 FlowCollector<T> 的函数式接口。上面调用 flow 返回的是 SafeFlow,所以 collect 函数的实现肯定也在 SafeFlow 这个类中。我们来看看 SafeFlow 的实现。

private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() {
    override suspend fun collectSafely(collector: FlowCollector<T>) {
        collector.block()
    }
}

public abstract class AbstractFlow<T> : Flow<T>, CancellableFlow<T> {

    public final override suspend fun collect(collector: FlowCollector<T>) {
        val safeCollector = SafeCollector(collector, coroutineContext)
        try {
            collectSafely(safeCollector)
        } finally {
            safeCollector.releaseIntercepted()
        }
    }
    // ...
}

SafeFlow 继承自 AbstractFlow,我们这里调用的是 AbstractFlowcollect 方法,这里创建了一个 SafeCollector ,然后调用了 collectorSafely 方法,在 SafeFlow 的 collectSafely 方法中又调用了 block 方法,注意这里的 block 就是调用 flow 函数传入的 lambda 代表的扩展方法。(没想到吧,还能这么玩)

所以我们调用的 emit 发射方法的实现是在 SafeCollector 中。如下:

override suspend fun emit(value: T) {
    return suspendCoroutineUninterceptedOrReturn sc@{ uCont ->
        try {
            emit(uCont, value)
        } catch (e: Throwable) {
            // Save the fact that exception from emit (or even check context) has been thrown
            lastEmissionContext = DownstreamExceptionElement(e)
            throw e
        }
    }
}

emit 是一个 suspend 的方法,通过调用 suspendCoroutineUninterceptedOrReturn 方法获取到当前 suspend 函数的 Continuation ,然后调用了非 suspend 的 emit 方法。注意这里的 try-catch,如果调用非 emit 的方法出现了异常,将会把异常赋值给 lastEmissionContext,然后抛出当前捕获的异常。接下来我们来看看 非 suspendemit 方法。

private fun emit(uCont: Continuation<Unit>, value: T): Any? {
    val currentContext = uCont.context // 当前的协程的 context
    currentContext.ensureActive() // 确保当前协程处于 active 状态(也就是没有取消,或者发生错误等)
    // This check is triggered once per flow on happy path.
    val previousContext = lastEmissionContext // 前一次的协程 context,如果前面 emit 发生了异常,那么这里的 previouseContext 将会是 DownstreamExceptionElement
    if (previousContext !== currentContext) {
        checkContext(currentContext, previousContext, value) // 检查
        lastEmissionContext = currentContext
    }
    completion = uCont
    val result = emitFun(collector as FlowCollector<Any?>, value, this as Continuation<Unit>)
    /*
     * If the callee hasn't suspended, that means that it won't (it's forbidden) call 'resumeWith` (-> `invokeSuspend`)
     * and we don't have to retain a strong reference to it to avoid memory leaks.
     */
    if (result != COROUTINE_SUSPENDED) {
        completion = null
    }
    return result
}

private fun checkContext(
    currentContext: CoroutineContext,
    previousContext: CoroutineContext?,
    value: T
) {
    if (previousContext is DownstreamExceptionContext) {
        exceptionTransparencyViolated(previousContext, value)
    }
    checkContext(currentContext)
}

在方法的一开始获取了当前协程的 Context,然后将 lastEmissionContext赋值给 previouseContext 。如果当前的 context 的引用不等于前一个 context 的引用(注意这里用的是 !==),那么就会调用 checkContext 来进行检查,在检查完成后会把 lastEmissionContext 赋值未当前的 currentContext

checkContext 中,如果 previousContextDownstreamExceptionContext 那么就会调用 exceptionTransparencyViolated 来抛出我们最开始提到的异常了。

private fun exceptionTransparencyViolated(exception: DownstreamExceptionContext, value: Any?) {
    /*
     * Exception transparency ensures that if a `collect` block or any intermediate operator
     * throws an exception, then no more values will be received by it.
     * For example, the following code:
     * ```
     * val flow = flow {
     *     emit(1)
     *     try {
     *          emit(2)
     *     } catch (e: Exception) {
     *          emit(3)
     *     }
     * }
     * // Collector
     * flow.collect { value ->
     *     if (value == 2) {
     *         throw CancellationException("No more elements required, received enough")
     *     } else {
     *         println("Collected $value")
     *     }
     * }
     * ```
     * is expected to print "Collected 1" and then "No more elements required, received enough" exception,
     * but if exception transparency wasn't enforced, "Collected 1" and "Collected 3" would be printed instead.
     */
    error("""
        Flow exception transparency is violated:
            Previous 'emit' call has thrown exception ${exception.e}, but then emission attempt of value '$value' has been detected.
            Emissions from 'catch' blocks are prohibited in order to avoid unspecified behaviour, 'Flow.catch' operator can be used instead.
            For a more detailed explanation, please refer to Flow documentation.
        """.trimIndent())

那么什么情况下会触发这个异常呢?我们可以看到官方的注释中给出了一个例子。从上面的分析中我们也可以得出结论,如果我们对 emit 方法进行 try-catch,并且在 catch 中调用 emit 发射数据那么就会抛出上面的异常了。

思考一下:为什么要抛出这样的异常?

我们来继续分析,等分析完后,你可能就会有答案了。

检查没问题的话,接下来调用了 emitFun

@Suppress("UNCHECKED_CAST")
private val emitFun =
    FlowCollector<Any?>::emit as Function3<FlowCollector<Any?>, Any?, Continuation<Unit>, Any?>

public fun interface FlowCollector<in T> {

    /**
     * Collects the value emitted by the upstream.
     * This method is not thread-safe and should not be invoked concurrently.
     */
    public suspend fun emit(value: T)
}

这里对 Kotlin 和 Java 理解不够深的话,理解起来还是比较困难的。我们逐步来拆解。emitFun 是一个变量(这里不能说是一个函数类型的变量,因为它的类型不是一个高级函数),它的类型是 Function3<FlowCollector<Any?>, Any?, Continuation<Unit>, Any?>。那么为什么能把 FlowCollector<Any?>::emit 转换为 Function3 呢?

首先从 Jvm 字节码的角度出发,Follectoremit 就是一个方法,emit 的参数列表中的第一个参数 this 就是 Follector,最后一个参数是 Kotlin 协程 suspend 函数添加的 Continuation,所以这里可以转换为 Funcation3

需要注意的,这里的 collector 就是我们调用 collect 时传入的 lambda 表达式所对应的对象,所以这里相当于调用了它的 emit 函数。可能还是有点难以理解,我们把上面的程序中的 lambda 表达式全部替换成匿名内部类,再来理解一下。

suspend fun main() {
    val flow = flow {
        emit(1)
        try {
            emit(2)
        } catch (e: Exception) {
            emit(3)
        }
    }
    // Collector
    flow.collect(object : FlowCollector<Int> {
        override suspend fun emit(value: Int) {
            if (value == 2) {
                throw CancellationException("No more elements required, received enough")
            } else {
                println("Collected $value")
            }
        }
    })
}

所以我们在 flow 中传入的 lambda 表达式中调用 emit 方法是 SafeCollectoremit 方法,这个 emit 方法,最终会调用到我们调用 collect 传入的 FlowCollectoremit 方法。

那么这里就会有一个问题,我们在 flow 上游发射数据,如果对 emit 方法加 try-catch,那么可能会 catch 到下游的异常,这违反了 flow 异常的透明性。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,294评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,493评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,790评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,595评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,718评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,906评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,053评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,797评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,250评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,570评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,711评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,388评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,018评论 3 316
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,796评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,023评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,461评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,595评论 2 350

推荐阅读更多精彩内容