1. Kotlin协程作用
Kotlin协程是一套基于Java Thread的线程框架,最大的特点就是可以1,用同步的方式写出异步代码,并且2,不阻塞当前线程。
2. cps转换
2.1 cps转换示例
//编译前
private suspend fun testCPS(): String {
withContext(Dispatchers.IO) {
return "testCPS"
}
}
//编译后
private final Object testCPS(Continuation $completion) {
return "testCPS";
}
Kotlin 的编译器检测到 suspend 关键字修饰的函数后,会进行cps转换,转换点:
1,函数中增加了一个Continuation类型的参数;
2,函数返回值变为Object(本例String->Object)
注意:这里suspend函数即使没有使用withContext开启一个协程,也会进行cps转换。说明只要suspend函数,不管开不开启协程,编译器都会对其进行cps转换。
2.2 参数Continuation
public interface Continuation<in T> {
public val context: CoroutineContext
// 相当于 onSuccess 结果
// ↓ ↓
public fun resumeWith(result: Result<T>)
}
interface CallBack {
void onSuccess(String response);
}
1,Continuation :续体,可以理解为剩余要执行的代码。协程体中的异步操作被状态机分割成不同的片段,分片段执行,执行完一部分,剩下的部分叫做续体。
2,Continuation 是一个接口,和一般回调接口定义类似,可以判断,协程的思想其实就是回调。
Continuation 定义看一个协程上下文属性context,一个方法声明resumeWith(),用于协程1,启动(DispatchedContinuation),2,挂起时恢复(BaseContinuationImpl),或者3,协程运行完成时的回调(AbstractCoroutine);
2.2 返回值 Object
为什么返回值从String->Object?
public suspend fun <T> withContext(
context: CoroutineContext,
block: suspend CoroutineScope.() -> T
): T {
contract {
callsInPlace(block, InvocationKind.EXACTLY_ONCE)
}
return suspendCoroutineUninterceptedOrReturn sc@ { uCont ->
...
val coroutine = DispatchedCoroutine(newContext, uCont)
coroutine.initParentJob()
block.startCoroutineCancellable(coroutine, coroutine)
coroutine.getResult()
}
}
fun getResult(): Any? {
if (trySuspend()) return COROUTINE_SUSPENDED
// otherwise, onCompletionInternal was already invoked & invoked tryResume, and the result is in the state
val state = this.state.unboxState()
if (state is CompletedExceptionally) throw state.cause
@Suppress("UNCHECKED_CAST")
return state as T
}
可以看出withContext的返回值有两种:
1,如果当前withContext中的异步操作没有完成,返回COROUTINE_SUSPENDED,协程框架根据这个字段挂起协程(其实就是直接return);
2,如果当前withContext中的异步操作已经完成,返回对应操作执行后的返回值(对应String)
3. 协程状态机
3.1 协程状态机
//编译前
class TestCoroutine {
private fun startCoroutine() {
// funTest协程体
val funTest: suspend CoroutineScope.() -> Unit = {
println("funTest")
suspendFun1()
suspendFun2()
}
GlobalScope.launch(Dispatchers.Default, block = funTest)
}
// 挂起函数
suspend fun suspendFun1() {
println("suspendFun1")
}
// 挂起函数
suspend fun suspendFun2() {
println("suspendFun2")
}
}
//编译后
public final class TestCoroutine {
private final void startCoroutine() {
Function2 funTest = (Function2)(new Function2((Continuation)null) {
int label;
@Nullable
public final Object invokeSuspend(@NotNull Object $result) {
Object var4 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
TestCoroutine var10000;
switch(this.label) {
case 0:
ResultKt.throwOnFailure($result);
String var2 = "funTest";
boolean var3 = false;
System.out.println(var2);
var10000 = TestCoroutine.this;
this.label = 1;
if (var10000.suspendFun1(this) == var4) {
return var4;
}
break;
case 1:
ResultKt.throwOnFailure($result);
break;
case 2:
ResultKt.throwOnFailure($result);
return Unit.INSTANCE;
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
var10000 = TestCoroutine.this;
this.label = 2;
if (var10000.suspendFun2(this) == var4) {
return var4;
} else {
return Unit.INSTANCE;
}
}
@NotNull
public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
Intrinsics.checkNotNullParameter(completion, "completion");
Function2 var3 = new <anonymous constructor>(completion);
return var3;
}
public final Object invoke(Object var1, Object var2) {
return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE);
}
});
BuildersKt.launch$default((CoroutineScope)GlobalScope.INSTANCE, (CoroutineContext)Dispatchers.getDefault(), (CoroutineStart)null, funTest, 2, (Object)null);
}
@Nullable
public final Object suspendFun1(@NotNull Continuation $completion) {
System.out.println(""suspendFun1"");
return Unit.INSTANCE;
}
@Nullable
public final Object suspendFun2(@NotNull Continuation $completion) {
System.out.println("suspendFun2");
return Unit.INSTANCE;
}
}
(其他文章描述)在反编译的代码中,协程体funTest被编译成一个继承SuspendLambda的类,在类中实现create(),invokeSuspend()两个方法:
create()创建了一个协程体funTest类的实例;
invokeSuspend()方法执行具体的协程操作。
(我反编译的代码)没搞懂的地方:
1,只创建了一个Function2的对象;
2,create方法创建的是匿名对象(anonymous constructor);
3,invoke方法是什么作用
基本原理:
协程体会被编译成一个SuspendLambda的子类,在这个类的invokeSuspend方法中,协程体中的suspend方法会被分割到switch不同的分支中,每个suspend方法会被cps机制转换成带有一个Continuation参数的方法。
通过一个label标签控制分支代码执行,label为0,首先会进入第一个分支,首先将label设置为下一个分支的数值,然后执行第一个suspend方法并传递当前Continuation,得到返回值,如果是COROUTINE_SUSPENDED,协程框架就直接return,协程挂起,当第一个suspend方法执行完成,会回调Continuation的invokeSuspend方法,进入第二个分支执行,以此类推执行完所有suspend方法。
如果suspend方法直接返回执行结果,那invokeSuspend后面的代码怎么执行?
3.2 SuspendLambda类图
4 协程相关概念
4.1 CoroutineContext
launch函数是CoroutineScope的一个扩展函数,CoroutineScope只是一个接口,但是可以通过CoroutineScope的扩展方法进行协程的创建,除了launch函数还有async函数。
CoroutineScope除了通过扩展函数创建协程还有其它两个作用,launch函数返回一个Job对象,可以通过这个Job管理协程,另外CoroutineScope为协程提供一个上下文CoroutineContext。
CoroutineContext协程的上下文,这是一个数据集合接口声明,协程中Job、Dispatcher调度器都可以是它的元素,CoroutineContext有一个非常好的作用就是我们可以通过它拿到Job、Dispatcher调度器等数据。
CombinedContext是CoroutineContext接口的具体实现类,存在两个属性,其中element是一个Element,代表集合的元素,left是一个CoroutineContext,代表链表的下一个节点。
通过CoroutineContext#plus可以看出,CoroutineContext的数据存储方式是一个左向链表,链表的每一个节点是CombinedContext,并且存在拦截器的情况下,拦截器永远是链表尾部的元素,这样设计目的是因为拦截器的使用频率很高,为了更快的读取拦截器;
没看懂这个左向链表实现,现在只要知道这是个集合,类似list,但是它有一个left元素始终在表尾,存储拦截器,
4.2 CoroutineStart 启动模式
CoroutineStart 是协程的启动模式,存在以下4种模式:
DEFAULT 立即调度,可以在执行前被取消
LAZY 需要时才启动,需要start、join等函数触发才可进行调度
ATOMIC 立即调度,协程肯定会执行,执行前不可以被取消
UNDISPATCHED 立即在当前线程执行,直到遇到第一个挂起点(可能切线程)
5. 协程启动
// launch是CoroutineScope的一个扩展函数
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.()
): Job {
// CoroutineContext创建一个新的Context
val newContext = newCoroutineContext(context)
// 启动模式的判断
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)
coroutine.start(start, coroutine, block)
return coroutine
}
launch函数存在3个参数:
CoroutineContext 协程的上下文
CoroutineStart 协程的启动模式
suspend CoroutineScope.() -> Unit 协程体
newCoroutineContext()是CoroutineScope的一个扩展方法,它的作用就是将传参context与CoroutineScope中的CoroutineContext集合合并,并返回一个新的CoroutineContext,如果传入Dispatchers.Default,就是将Dispatchers.Default与CoroutineScope中的CoroutineContext合并。
根据启动模式,构建一个AbstractCoroutine的子类(协程对象都继承AbstractCoroutine),如果是默认模式,则创建StandaloneCoroutine,并调用它的start方法。并将StandaloneCoroutine又作为job返回。
5.1 AbstractCoroutine
AbstractCoroutine继承或者实现了JobSupport、Job、Continuation、CoroutineScope。
JobSupport是Job的具体实现,AbstractCoroutine可以作为一个Job控制协程的生命周期,同时实现Continuation接口,也可以作为一个Continuation,重写的resmueWith()方法的一个重要作用是恢复协程
AbstractCoroutine#resmueWith
public final override fun resumeWith(result: Result<T>) {
val state = makeCompletingOnce(result.toState())
// 子协程未完成,父协程需要等待子协程完成之后才可以完成
if (state === COMPLETING_WAITING_CHILDREN) return
// 子协程全部执行完成或者没有子协程的情况下不需要等待
afterResume(state)
}
protected open fun afterResume(state: Any?): Unit = afterCompletion(state)
// JobSupport#afterCompletion
protected open fun afterCompletion(state: Any?) {}
在AbstractCoroutine#resmueWith中首先根据JobSupport#makeCompletingOnce返回状态判断,协程是否处于等待子协程完成的状态:
state == COMPLETING_WAITING_CHILDREN 等待子协程完成,自身才可完成。子协程完成后,触发afterCompletion()
state != COMPLETING_WAITING_CHILDREN 没有子协程或者所有子协程已经完成,自身可以完成,直接触发afterCompletion()
协程对象可以通过重写afterCompletion()处理协程完成之后的操作,下文中的协程恢复章节中,withContext()中DispatchedCoroutine协程对象,通过afterCompletion()恢复了外层的协程的运行。
AbstractCoroutine#start()
public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
...
//block :协程体 //receiver:协程对象 //this:AbstractCoroutine(也是协程对象)
start(block, receiver, this)
}
CoroutineStart#invoke
public operator fun <R, T> invoke(block: suspend R.() -> T, receiver: R, completion: Continuation<T>): Unit =
when (this) {
//completion:start传过来的AbstractCoroutine
DEFAULT -> block.startCoroutineCancellable(receiver, completion)
ATOMIC -> block.startCoroutine(receiver, completion)
UNDISPATCHED -> block.startCoroutineUndispatched(receiver, completion)
LAZY -> Unit // will start lazily
}
internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(
receiver: R, completion: Continuation<T>,
onCancellation: ((cause: Throwable) -> Unit)? = null
) =
runSafely(completion) {
createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellableWith(Result.success(Unit), onCancellation)
}
AbstractCoroutine#start()调用了CoroutineStart的invoke()方法,然后根据启动模式调用block对应的协程启动方法,block.startCoroutineCancellable(receiver, completion) 中是一个链式调用流程。
createCoroutineUnintercepted()
public actual fun <T> (suspend () -> T).createCoroutineUnintercepted(
completion: Continuation<T>
): Continuation<Unit> {
// probeCompletion :AbstractCoroutine
val probeCompletion = probeCoroutineCreated(completion)
return if (this is BaseContinuationImpl)
create(probeCompletion)
else
createCoroutineFromSuspendFunction(probeCompletion) {
(this as Function1<Continuation<T>, Any?>).invoke(it)
}
}
// continuation:AbstractCoroutine
public final Continuation<Unit> create(Object obj, Continuation<?> continuation) {
...
MainActivity$startCoroutine$funTest$1 mainActivity$startCoroutine$funTest$1 = new MainActivity$startCoroutine$funTest$1(this.this$0, continuation);
return mainActivity$startCoroutine$funTest$1;
}
createCoroutineUnintercepted中通过调用create(probeCompletion)创建了一个协程体类的对象。
createCoroutineUnintercepted()是一个扩展函数,通过协程体block调用,所以源码中this is BaseContinuationImpl的判断中this指协程体类,编译章节中协程体被编译成SuspendLambda的子类;这里的create函数就是SuspendLambda的子类中的create函数。
这里的block对象和协程体类对象是什么关系?有什么区别?
注意看下构造函数的参数continuation,这里continuation就是AbstractCoroutine,在协程体类的继承链中,这个continuation一直传递到了BaseContinuationImpl父类中,用于后续协程的恢复。
注意:这里将AbstractCoroutine对象传递给了协程类对象,进行了第一层代理。
继续分析intercepted()
public actual fun <T> Continuation<T>.intercepted(): Continuation<T> =
(this as? ContinuationImpl)?.intercepted() ?: this
ContinuationImpl#intercepted
public fun intercepted(): Continuation<Any?> =
intercepted
?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this) .also { intercepted = it }
CoroutineDispatcher#interceptContinuation
public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
//this是Dispatcher,continuation是协程体类对象
DispatchedContinuation(this, continuation)
intercepted()协程体类对象转换成ContinuationImpl,然后调用了ContinuationImpl的intercepted方法,intercepted方法中调用context[ContinuationInterceptor] 从协程类对象的CoroutineContext集合中取到调度器CoroutineDispatcher(这个CoroutineContext是launch是构建的,并传递到StandaloneCoroutine对象中),并调用调度器CoroutineDispatcher的interceptContinuation(),interceptContinuation()的作用是将协程体Continuation对象包装成一个DispatchedContinuation。
注意:这里将协程类对象传递给了DispatchedContinuationd对象,进行了第二层代理
5.2 CoroutineDispatcher
CoroutineDispatcher
public abstract class CoroutineDispatcher :
AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
...
// 是否需要线程调度
public open fun isDispatchNeeded(context: CoroutineContext): Boolean = true
// 线程调度,让一个runable对象在指定线程运行
public abstract fun dispatch(context: CoroutineContext, block: Runnable)
// 将协程体对象continuation封装为一个DispatchedContinuation对象
public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
DispatchedContinuation(this, continuation)
...
}
CoroutineDispatcher的作用是进行任务的线程切换。
CoroutineDispatcher实现了ContinuationInterceptor,代表是一个拦截器;
实现CoroutineContext接口,存储在CoroutineContext的left节点。
CoroutineContext[ContinuationInterceptor]就可以在CoroutineContext集合中获取到拦截器。
为啥要通过拦截器去代理Continuation,直接使用DispatchedContinuation包装不就行了吗??
5.3 DispatchedContinuation
接着分析resumeCancellableWith(Result.success(Unit), onCancellation)
DispatchedContinuation
internal class DispatchedContinuation<in T>(
// 调度器
@JvmField val dispatcher: CoroutineDispatcher,
// 协程体Continuation对象
@JvmField val continuation: Continuation<T>
) : DispatchedTask<T>(MODE_ATOMIC_DEFAULT), CoroutineStackFrame, Continuation<T> by continuation {
// 使用delegate存储当前对象
override val delegate: Continuation<T>
get() = this
// ATOMIC启动模式
override fun resumeWith(result: Result<T>) {
val context = continuation.context
val state = result.toState()
// 是否需要线程调度
if (dispatcher.isDispatchNeeded(context)) {
_state = state
resumeMode = MODE_ATOMIC_DEFAULT
// dispatch 调度线程,第二个参数是一个Runnable类型,这里传参this也就是DispatchedContinuation自身
// DispatchedContinuation实际上也是一个Runnable对象,调用调度器的dispatch方法之后就可以使这个runnable在指定的线程运行了
dispatcher.dispatch(context, this)
} else {
executeUnconfined(state, MODE_ATOMIC_DEFAULT) {
withCoroutineContext(this.context, countOrElement) {
// 不需要调度,执行协程体的resumeWith
continuation.resumeWith(result)
}
}
}
}
// 默认启动模式
inline fun resumeCancellableWith(result: Result<T>) {
val state = result.toState()
if (dispatcher.isDispatchNeeded(context)) {
_state = state
resumeMode = MODE_CANCELLABLE
dispatcher.dispatch(context, this)
} else {
executeUnconfined(state, MODE_CANCELLABLE) {
if (!resumeCancelled()) {
resumeUndispatchedWith(result)
}
}
}
}
}
DispatchedContinuation 代理协程体类对象(SuspendLambda)并持有线程调度器(CoroutineDispatcher),它的作用就是使用线程调度器将协程体调度到指定的线程执行。
DispatchedContinuation也实现了Continuation接口,并重写resumeWith(),首先
根据dispatcher.isDispatchNeeded(context)判断需要线程切换:
1.如果需要线程调度,则调用dispatcher#dispatch进行调度,而dispatch()的第二个参数是一个runnable对象(这里传参为this,即DispatchedContinuation对象本身,DispatchedContinuation同时还实现了Runnable接口),这个runnable就会运行在调度的线程上;
2.不需要调度则直接调用协程体类continuation对象的resumeWith(),前面的章节中提到,协程体的运行就是协程体类Continuation对象的resumeWith()被触发,所以这里就会让协程体在当前线程运行;
另外还有一个方法resumeCancellableWith(),它和resumeWith()的实现很类似,在不同的启动模式下调度线程的方法调用不同。比如默认的启动模式调用resumeCancellableWith(),ATOMIC启动模式则调用resumeWith()。
public actual object Dispatchers {
@JvmStatic
public actual val Default: CoroutineDispatcher = createDefaultDispatcher()
@JvmStatic
public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher
@JvmStatic
public actual val Unconfined: CoroutineDispatcher = kotlinx.coroutines.Unconfined
@JvmStatic
public val IO: CoroutineDispatcher = DefaultScheduler.IO
}
internal actual fun createDefaultDispatcher(): CoroutineDispatcher =
if (useCoroutinesScheduler) DefaultScheduler else CommonPool
可以看到Default和IO 底层还是使用了线程池进行调度;Main使用了handler进行调度。
5.4 DispatchedTask
internal abstract class DispatchedTask<in T>(
@JvmField public var resumeMode: Int
) : SchedulerTask() {
’
// 在DispatchedContinuation中重写了该属性,delegate实际是指DispatchedContinuation对象
internal abstract val delegate: Continuation<T>
public final override fun run() {
...
val delegate = delegate as DispatchedContinuation<T>
// 通过delegate拿到原始协程体Continuation对象
val continuation = delegate.continuation
...
// 调用协程体类对象的resume
continuation.resume(getSuccessfulResult(state))
...
}
}
// Continuation的扩展方法,触发Continuation内的方法resumeWith
public inline fun <T> Continuation<T>.resume(value: T): Unit =
resumeWith(Result.success(value))
DispatchedTask的run方法中调用了协程体类对象的resume方法,间接调用了BaseContinuationImpl的resumeWith
注意:这里是协程体类的resumeWith被执行了,不是DispatchedContinuation
5.5 BaseContinuationImpl
internal abstract class BaseContinuationImpl(
// completion:实参是一个AbstractCoroutine
public val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable {
public final override fun resumeWith(result: Result<Any?>) {
var current = this
var param = result
while (true) {
probeCoroutineResumed(current)
with(current) {
val completion = completion!!
val outcome: Result<Any?> =
try {
// 调用invokeSuspend方法,协程体真正开始执行
val outcome = invokeSuspend(param)
// invokeSuspend方法返回值为COROUTINE_SUSPENDED,resumeWith方法被return,结束执行,说明执行了挂起操作
if (outcome === COROUTINE_SUSPENDED) return
// 协程体执行成功的结果
Result.success(outcome)
} catch (exception: Throwable) {
// 协程体出现异常的结果
Result.failure(exception)
}
releaseIntercepted() // this state machine instance is terminating
if (completion is BaseContinuationImpl) {
current = completion
param = outcome
} else {
// 在示例代码中,completion是一个AbstractCoroutine,是指launch函数创建的StandaloneCoroutine
completion.resumeWith(outcome)
return
}
}
}
}
protected abstract fun invokeSuspend(result: Result<Any?>): Any?
}
BaseContinuationImpl定义了一个抽象方法invokeSuspend(),并重写了Continuation的resumeWith(),并在其中调用invokeSuspend(),具体实现就是SuspendLambda的invokeSuspend(),invokeSuspend()方法中便是具体协程任务。
invokeSuspend()和invoke啥关系??
5.6 协程启动流程总结:
以调度器为Dispatchers.Default,启动模式为CoroutineStart.DEFAULT为例:
CoroutineScope#launch()创建一个协程,在其内部实现中根据启动模式为CoroutineStart.DEFAULT,创建一个StandaloneCoroutine协程对象,并触发StandaloneCoroutine#start(start, coroutine, block);
StandaloneCoroutine的父类是AbstractCoroutine,StandaloneCoroutine#start()的实现在其父类中,即AbstractCoroutine#start();
在AbstractCoroutine#start()中,触发CoroutineStart#invoke();
CoroutineStart#invoke()的处理逻辑中,根据调度器为Dispatchers.Default,调用协程体的startCoroutineCancellable()方法;
startCoroutineCancellable()的内部处理是一个链式调用:
createCoroutineUnintercepted(..).intercepted().resumeCancellableWith(Result.success(Unit))
createCoroutineUnintercepted()创建一个协程体类对象;
intercepted()使用拦截器(调度器)将协程体类对象包装成DispatchedContinuation(DispatchedContinuation代理了协程体类Continuation对象,并持有调度器);
调用DispatchedContinuation#resumeCancellableWith()。
在DispatchedContinuation#resumeCancellableWith()中,使用线程调度器触发dispatcher#dispatch(context, this)进行调度,该调度器为Dispatchers.Default;
Dispatchers.Default#dispatch()调度处理中,将DispatchedContinuation分发到CoroutineScheduler线程池中,由CoroutineScheduler分配一个线程Worker,最终在Woreder的run()方法中触发了DispatchedContinuation的run(),其内部实现是使协程体Continuation对象的resumeWithI()得以执行,前文中分析到协程体的执行其实就是resumeWith()方法被调用,这样协程体就可以在执行的线程中执行了。
协程启动流程图:
5.7 三个协程对象总结:
1,第一层协程对象AbstractCoroutine,主要处理协程状态和恢复挂起协程。
2,第二层对象BaseContinuationImpl,由编译器将我们的代码转换而成,利用状态机实现代码分段执行和协程挂起,并代理第一层对象。在其resumeWith方法中通过调用invokeSuspend()执行我们的任务代码。
注意:resumeWith方法中调用completion.resumeWith(outcome)是恢复协程,这个completion是AbstractCoroutine(launch对应StandaloneCoroutine或者witchContext对应DispatchedCoroutine),不是BaseContinuationImpl的子类。
3,第三层对象DispatchedContinuation,对第二层协程对象进行代理,负责使用dispatcher进行调度任务。
注意:协程体类是持有了一个AbstractCoroutine,不是继承
6.协程挂起
withContext()
public suspend fun <T> withContext(
context: CoroutineContext,
block: suspend CoroutineScope.() -> T
): T {
contract {
callsInPlace(block, InvocationKind.EXACTLY_ONCE)
}
// 返回启动withContext的协程体
return suspendCoroutineUninterceptedOrReturn sc@ { uCont ->
// 构建一个新的newContext,合并当前协程体以及withContext协程体的CoroutineContext
val oldContext = uCont.context
val newContext = oldContext + context
// 检查协程是否活跃,如果线程处于非活跃的状态,抛出cancle异常
newContext.checkCompletion()
...
// DispatchedCoroutine也是一个AbstractCoroutine对象,负责协程完成的回调,
// 注意这里的Continuation的传参为uCont,及发起withContext的协程对象
val coroutine = DispatchedCoroutine(newContext, uCont)
coroutine.initParentJob()
// 和协程启动的流程一样,启动withContext的协程
// 注意这里的传参coroutine为DispatchedCoroutine,它持有需要恢复的协程
block.startCoroutineCancellable(coroutine, coroutine)
// 返回结果为挂起还是完成
coroutine.getResult()
}
}
在withContext()的源码可以看到,withContext()的协程体的启动和原有协程的启动流程是一样的,DispatchedCoroutin是AbstractCoroutine的一个子类,并且在创建DispatchedCoroutin时的传参是外层协程体对象,这是因为当withContext()的协程体完成的时候需要通过外层协程体对象恢复当前协程的运行。
先看下协程的挂起coroutine.getResult()的实现。
// DispatchedCoroutine#getResult
fun getResult(): Any? {
// 返回COROUTINE_SUSPENDED,挂起
if (trySuspend()) return COROUTINE_SUSPENDED
val state = this.state.unboxState()
// 出现异常
if (state is CompletedExceptionally) throw state.cause
@Suppress("UNCHECKED_CAST")
// 未出现异常结果返回
return state as T
}
// DispatchedCoroutine#trySuspend
private val _decision = atomic(UNDECIDED)
private fun trySuspend(): Boolean {
_decision.loop { decision ->
when (decision) {
// compareAndSet原子操作,当前值与预期值一致时返回true,以原子方式更新自身的值
UNDECIDED -> if (this._decision.compareAndSet(UNDECIDED, SUSPENDED)) return true
RESUMED -> return false
else -> error("Already suspended")
}
}
}
是否挂起,结束协程运行,关键在是否返回COROUTINE_SUSPENDED标志,在getResult()方法中的处理逻辑,就是看trySuspend()是否返回true。
trySuspend()方法中,_decision默认为UNDECIDED,预期的参数值传参也为UNDECIDED,所以,trySuspend返回true,最终getResult方法返回了COROUTINE_SUSPENDED,协程被挂起了。
7.协程恢复
withContext()启动一个协程和launch类似,当执行到BaseContinuationImpl的resumeWith方法,调用invokeSuspend得到结果之后,会调用内部代理的completion.resumeWith(outcome)方法,这个completion是DispatchedCoroutine。
DispatchedCoroutine是AbstractCoroutine的子类,当协程完成时会调用它的内部方法resumeWith(),内部的处理逻辑最后会触发JubSpuuort#afterCompletion(),而在DispatchedCoroutine中重写了afterCompletion()。
private class DispatchedCoroutine<in T>(
context: CoroutineContext,
// 外部需要恢复的协程
uCont: Continuation<T>
) : ScopeCoroutine<T>(context, uCont) {
override fun afterCompletion(state: Any?) {
afterResume(state)
}
override fun afterResume(state: Any?) {
// 在getResult()之前,协程已运行结束,未发生挂起,不需要恢复外层协程
if (tryResume()) return
// 获取外部协程的DispatchedContinuation,去恢复外层协程
uCont.intercepted().resumeCancellableWith(recoverResult(state, uCont))
}
}
uCont.intercepted()获取到外层协程的DispatchedContinuation,然后调用resumeCancellableWith方法,使用外层协程的dispatcher将任务的执行切换到之前的线程中去执行。再次调用到外层协程的BaseContinuationImpl#resumeWith方法,再次调用到外层协程类的invokeSuspend方法中,去执行剩余代码。如果能直接结果就调用外层协程的completion.resumeWith(outcome)结束协程(completion是外层协类对象中代理的AbstractCoroutine)