为了实现结构化并发,其中一个很重要的命题就是协程的取消。
在使用kotlin协程进行开发前,有两个概念我们始终是绕不开的,
一个是协程本身,一个是suspend函数。
协程通过调用协程作用域CoroutineScope的顶层函数,如launch启动。而协程本身也是一个CoroutineScope,我们可以在协程内部再次launch新的协程,而没有显示通过指定协程作用域启动的协程,都是当前协程的子协程。
通过如上所描述的关系,我们可以发现从顶层作用域开始,到最底层的叶子协程。形成了一颗树。kotlin协程内部使用Job这个类对这个树的结点进行抽象封装。
ParentJob (Scope)
│
├── Job1 (scope.launch { ... })
│ ├── Job1-1 (Job1.launch { ... })
│ └── Job1-2 (Job1.launch { ... })
│
├── Job2 (scope.launch { ... })
│ └── Job2-1 (Job2.launch { ... })
│
└── Job3 (scope.launch { ... })
而suspend函数是什么,顾名思义这个函数是可以挂起执行等待条件满足之后再继续执行的函数。kotlin的suspend函数最终会被编译为Continuation<T>,其挂起的本质是将自身这个Continuation传递给被挂起的函数,被挂起的函数条件满足之后调用Continuation的恢复执行方法。继续从正确的位置执行。也就是所谓的CPS。
因此协程和suspend函数的关系,相当于环境和实际逻辑的关系。协程为suspend函数提供了执行的环境,suspend函数在协程中执行。
作用域:顶层中断函数环境
协程: 为子协程提供中断函数环境。
因此从作用域的视角来说,二者可统一理解为中断函数提供环境。
但个人觉得作用域这个设计有好有坏,好处是比较灵活。我们可以在任意层次的suspend函数通过作用域启动新的协程。坏处是这可能违背结构化并发。因为你通过新的作用域启动的协程可能不会伴随根作用域的取消而同步取消!
异常如何在协程层级树中传播?
public final override fun resumeWith(result: Result<Any?>) {
// This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume
var current = this
var param = result
while (true) {
// Invoke "resume" debug probe on every resumed continuation, so that a debugging library infrastructure
// can precisely track what part of suspended callstack was already resumed
probeCoroutineResumed(current)
with(current) {
val completion = completion!! // fail fast when trying to resume continuation without completion
val outcome: Result<Any?> =
try {
val outcome = invokeSuspend(param)
if (outcome === COROUTINE_SUSPENDED) return
Result.success(outcome)
} catch (exception: Throwable) {
Result.failure(exception)
}
releaseIntercepted() // this state machine instance is terminating
if (completion is BaseContinuationImpl) {
// unrolling recursion via loop
current = completion
param = outcome
} else {
// top-level completion reached -- invoke and return
completion.resumeWith(outcome)
return
}
}
}
}
resumeWith是kotlin协程设计用来唤醒的API,并且对于中断函数来说,它的实现也是继承BaseContinuationImpl的。
分析resumeWith我们可以看到在while循环中,有一个try catch逻辑,这里捕获了所有的异常。并且后续判断如果completion不是BaseContinuationImpl的情况,这种情况代表completion是外层的协程环境。这种情况下调用resumeWith会调用到AbstractCoroutine的resumeWith,AbstractCoroutine是协程实现的一个抽象父类。我们来看一下这个方法的实现,这个方法如果是异常的情况下会取消自己,以及取消子协程。并且通知父协程子协程取消了,通过childCancelled方法,如果这个异常不是CancellationException,则会继续往上传递,一直到通过scope.launch启动的顶层协程,这个时候如果我们在这个协程上安装了CoroutineExceptionHandler,那么它会全局地处理协程的异常。
public final override fun resumeWith(result: Result<T>) {
val state = makeCompletingOnce(result.toState())
if (state === COMPLETING_WAITING_CHILDREN) return
afterResume(state)
}
public open fun childCancelled(cause: Throwable): Boolean {
if (cause is CancellationException) return true
return cancelImpl(cause) && handlesException
}
public fun handleCoroutineException(context: CoroutineContext, exception: Throwable) {
// Invoke an exception handler from the context if present
try {
context[CoroutineExceptionHandler]?.let {
it.handleException(context, exception)
return
}
} catch (t: Throwable) {
handleCoroutineExceptionImpl(context, handlerException(exception, t))
return
}
// If a handler is not present in the context or an exception was thrown, fallback to the global handler
handleCoroutineExceptionImpl(context, exception)
}
CancellationException
CancellationException是特殊的异常。
对于通过CancellationException导致的取消,其最终只会导致suspend直接所归属的协程取消,不会取消父协程。也不会取消兄弟协程。
其他异常取消
之所以说协程的取消是协作机制的,是因为它并不强制中断正在执行的中断函数,它只是修改Job的状态。其他代码块儿通过观察Job的状态,通过注册回调或者观察Job的isActive值来决定是否要继续执行。
取消是两阶段的
- 修改Job的状态为Cancelling
- 等待内部的suspend函数执行完成,调用协程的resumeWith
- 此时才会进入Cancelled状态
理解取消是协作的对编写正确的代码是很重要的,比如如下代码
val result = withTimeoutOrNull(100) {
// long running code
// here using Thread.sleep(100000) as an example
Thread.sleep(100_000)
return@withTimeoutOrNull "success"
}
在代码块中我们通过Thread.sleep(100_000)模拟了一个耗时操作,但是我们期望这个操作要在100毫秒内完成,模拟这个操作是因为在某些安卓rom上,某一个系统API的调用通常很快,但是会偶现十分耗时。我们对这个操作期望它可能会失败。但是不希望它卡在那里。但遗憾的是,上面的代码并不能在100ms之后‘通过超时’机制返回null,原因是因为Thread.sleep(100_000)是不响应协程中断的。根据上文描述的取消两阶段来看。此时整个函数必须等到sleep完成之后才会结束。
作用域的取消
一个协程作用领域本质上也会有一个相联系的Job,这个Job就是这个协程作用域的根。
如果作用域一旦被cancel了,这个时候再启动协程是不会生效的,协程会被立刻取消。我们可以通过CoroutineScope创建我们的协程作用域,如果不希望通过launch的协程抛出的异常取消整个作用域,我们需要通过CoroutineScope(SupervisorJob())的方式创建作用域。
delay函数如何响应取消?
底层依赖suspendCancellableCoroutine提供的CancellableContinuationImpl,这个Continuation会对协程的取消进行感知。取消之后会调用我们通过invokeOnCancellation注册的回调,实现取消机制,以HandlerDispatcher对delay的实现为例
override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
val block = Runnable {
with(continuation) { resumeUndispatched(Unit) }
}
if (handler.postDelayed(block, timeMillis.coerceAtMost(MAX_DELAY))) {
continuation.invokeOnCancellation { handler.removeCallbacks(block) }
} else {
cancelOnRejection(continuation.context, block)
}
}
如何处理协程中的异常
有的时候一个中断函数的执行可能抛出异常。通过全局注册CoroutineExceptionHandler,并不能很好地达成我们的目的,它不够灵活。我们希望能够像编写通过同步代码的方式用try catch去处理一个中断函数可能抛出的异常。那么有没有办法呢,有!而且不止一种。
- 方式1: 通过coroutineScope
通过coroutineScope把我们的中断函数包裹起来,外层就可以通过try catch检查异常
- 方式2: 通过async启动协程+await等待