Kotlin 协程启动篇:静态代理分层

前段时间在项目中引入了 Kotlin Coroutine,那么也来谈谈对它的理解。所谓窥一斑而知全豹,首先尝试透过一个🌰来窥探协程的启动流程,直接通过 GlobalScope.launch 启动了一个顶级协程并指定在主线程中执行:

fun test() {
    GlobalScope.launch(Dispatchers.Main) {
        Log.d("GlobalScopeTest", "Log in GlobalScope")
    }
    Log.d("GlobalScopeTest", "Log after GlobalScope")
}

上面的代码输出结果为:

Log after GlobalScope
Log in GlobalScope

为什么会出现这样的执行顺序?协程体内的代码逻辑是透过什么机制分发到主线程执行?要解答这些问题,还得从 GlobalScope.launch 寻找突破口。

1、GlobalScope.launch:协程入口

launch 是个扩展函数、启动协程并且返回一个 job 对象。这里先大概了解返回的 job 可以方便地对协程进行追踪,取消等操作。

launch 源码:

public fun CoroutineScope.launch(
   context: CoroutineContext = EmptyCoroutineContext,
   start: CoroutineStart = CoroutineStart.DEFAULT,
   block: suspend CoroutineScope.() -> Unit
): Job {
   val newContext = newCoroutineContext(context)
   // 构建 AbstractCoroutine 对象,并由此开启协程任务
   val coroutine = if (start.isLazy)
       LazyStandaloneCoroutine(newContext, block) else
       StandaloneCoroutine(newContext, active = true)
   coroutine.start(start, coroutine, block)
   return coroutine
}

看了 launch 方法,会有一个疑惑:经过编译后,协程把我们的代码逻辑封装成了一个 SuspendLambda 类,那么必然需要该类的相关信息才可能进行回调执行逻辑,但是在入口处并没有看到相关参数,难道还有什么黑科技能够获取到类相关信息?

事实上反编译 class 文件,发现 CoroutineScope.launch 方法签名和源码实现略有不同,其中多了一个参数:Function2<? super CoroutineScope, ? super Continuation<? super T>, ? extend Object>。 而生成的 SuspendLambda 类就是 Function2 的实现类。

没错,在调用 CoroutineScope.launch 的时候,传入的 Function2 的参数值就是生成的 SuspendLambda 对象!所以在这里并没有什么黑科技,只是由于编译器的参与,隐藏了其中一些细节。

反编译得到的 launch 源码:

@NotNull
public static final Job launch(@NotNull CoroutineScope $this$launch, @NotNull CoroutineContext context, @NotNull CoroutineStart start, @NotNull Function2<? super CoroutineScope, ? super Continuation<? super Unit>, ? extends Object> block) {
   StandaloneCoroutine coroutine;
   Intrinsics.checkParameterIsNotNull($this$launch, "$this$launch");
   Intrinsics.checkParameterIsNotNull(context, "context");
   Intrinsics.checkParameterIsNotNull(start, "start");
   Intrinsics.checkParameterIsNotNull(block, "block");
   CoroutineContext newContext = CoroutineContextKt.newCoroutineContext($this$launch, context);
   if (start.isLazy()) {
        coroutine = new LazyStandaloneCoroutine(newContext, block);
   } else {
       coroutine = new StandaloneCoroutine(newContext, true);
   }
   coroutine.start(start, coroutine, block);
   return coroutine;
}

回到 launch ,方法最后调用 start 启动了协程。层层追踪最后调用了 startCoroutineCancellable:

internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(receiver: R, completion: Continuation<T>) =
    runSafely(completion) {
        createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellable(Unit)
    }

观察方法名大概能得出一个判断:

  1. createCoroutineUnintercepted:首先创建了一个非拦截的协程 Continuation。
  2. intercepted:添加拦截器 Continuation。
  3. resume:正式启动协程。

看来要知道更多细节,还要从这几个方法入手。

2、createCoroutineUnintercepted:构建任务层对象

createCoroutineUnintercepted 在 jvm 的实现在 IntrinsicsJvm 类中。在这里,launch 处传入的 Function2 参数起了作用,通过这个对象调用 create() 来构建 编译器生成的 SuspendLambda 对象

@SinceKotlin("1.3")
public fun <R, T> (suspend R.() -> T).createCoroutineUnintercepted(
    receiver: R,
    completion: Continuation<T>
): Continuation<Unit> {
    val probeCompletion = probeCoroutineCreated(completion)
    return if (this is BaseContinuationImpl)
         // 创建 SuspendLambda 对象
        create(receiver, probeCompletion)
    else {
        createCoroutineFromSuspendFunction(probeCompletion) {
            (this as Function2<R, Continuation<T>, Any?>).invoke(receiver, it)
        }
    }
}

反编译得到的 create 方法 :

@NotNull
public final Continuation<Unit> create(@Nullable Object value, @NotNull Continuation<?> completion) {
    Intrinsics.checkParameterIsNotNull(completion, "completion");
    
    // 将Continuation传入作为completion参数值(静态代理)
    Continuation coroutineTest$test$1 = new CoroutineTest$test$1(this.$context, completion);
    CoroutineScope coroutineScope = (CoroutineScope) value;
    coroutineTest$test$1.p$ = (CoroutineScope) value;
    return coroutineTest$test$1;
}

所以 launch 传入的 Function2 参数作用是构建出编译器为我们生成的 SuspendLambda 对象,后续才能回调挂起的逻辑。那么 SuspendLambda.intercepted() 又做了什么?

3、Continuation.intercepted:构建任务调度层对象

追踪 intercepted 调用链,最终调用了 ContinuationImpl.intercepted :

public fun intercepted(): Continuation<Any?> =
    intercepted ?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
                .also { intercepted = it }

上面代码关键点就在于 (context[ContinuationInterceptor]?.interceptContinuation(this),context[ContinuationInterceptor] 得到的是 HandlerContext 对象(why?)而 handlerContext 是 CoroutineDispatcher 的子类(CoroutineDispatcher 可以理解为线程级别的任务调度类,决定了任务在哪个线程中执行),CoroutineDispatcher.interceptContinuation() 返回了 DispatchedContinuation 对象。

why HandlerContext handlerContext = context[ContinuationInterceptor] ?

栗子中我们调用 launch 传入了 Dispatchers.Main 作为 context,CoroutineContext 可以理解为一个数据结构,其中键为 ContinuationInterceptor,值为 Dispatchers.Main。所以context[ContinuationInterceptor] 得到的是 Dispatchers.Main,即为 HandlerContext。

intercepted 就是通过静态代理得到了 DispatchedContinuation 对象。接下来只需要通过 resumeCancellable 开启协程。

4、DispatchedContinuation.resumeCancellable:开启协程任务

DispatchedContinuation 为任务调度类,将具体任务分配给 Dispatcher(也就是上面通过 context[ContinuationInterceptor] 得到的对象) 执行。resumeCancellable 内部调用 dispatcher.dispatch 将任务交给了 Dispatcher,方法第二个参数 this 指代 runnable 对象。

resumeCancellable 的实现:

@Suppress("NOTHING_TO_INLINE") // we need it inline to save us an entry on the stack
inline fun resumeCancellable(value: T) {
   // 指定需要线程调度,默认为true
   if (dispatcher.isDispatchNeeded(context)) {
       _state = value
       resumeMode = MODE_CANCELLABLE
       dispatcher.dispatch(context, this)
   } else {
   // 不需要线程调度
       executeUnconfined(value, MODE_CANCELLABLE) {
           if (!resumeCancelled()) {
                resumeUndispatched(value)
           }
       }
   }
}

HandlerContext 的 dispatch 方法实现如下,通过 handler 把任务抛出去:

override fun dispatch(context: CoroutineContext, block: Runnable) {
    handler.post(block)
}

DispatchedContinuation 内 runnable.run 方法实现如下:

public final override fun run() {
   val taskContext = this.taskContext
   var exception: Throwable? = null
   try {
       val delegate = delegate as DispatchedContinuation<T>
       val continuation = delegate.continuation
       val context = continuation.context
       val job = if (resumeMode.isCancellableMode) context[Job] else null
       val state = takeState() // NOTE: Must take state in any case, even if cancelled
       withCoroutineContext(context, delegate.countOrElement) {
           if (job != null && !job.isActive) {
               val cause = job.getCancellationException()
               cancelResult(state, cause)
               continuation.resumeWithStackTrace(cause)
           } else {
               val exception = getExceptionalResult(state)
               if (exception != null)
                   continuation.resumeWithStackTrace(exception)
               else
                 // 回调suspendLambda.resumeWith()
                   continuation.resume(getSuccessfulResult(state))
           }
       }
   } catch (e: Throwable) {
        // This instead of runCatching to have nicer stacktrace and debug experience
      exception = e
   } finally {
      val result = runCatching { taskContext.afterTask() }
      handleFatalException(exception, result.exceptionOrNull())
   }
}

run 方法内部通过调用 continuation.resume 回调整条链路的任务逻辑。

continuation.resume 的主要下游调用链路:continuation.resume() -> continuation.resumeWith() -> suspendLambda.invokeuspend() -> AbstractContinue.resumeWith()。

其中 suspendLambda.invokeuspend 内部封装的就是我们栗子中的逻辑: Log.d("GlobalScopeTest", "Log in GlobalScope")

从 launch 开启协程到如何回调到我们的逻辑大致流程已经理清。由于这个栗子指定了主线程作为运行环境。还有更多关于协程如何进行线程调度、多线程下函数挂起、恢复等流程在这里未涉及到。不过我们大概可以推测对于指定IO线程执行的协程体,协程必然维护了自己的线程池作为协程的运行环境。

小结

  1. launch 方法还有一个隐式参数:Function2,也就是编译器生成 SuspendLambda 的子类对象。为后续构建真实可用的 SuspendLambda 对象做铺垫。
  2. 通过静态代理构建了三层 Continuation 分别负责不同的职责:AbstractCoroutine(协程状态处理层) -> SuspendLambda(逻辑执行层) -> CoroutineDispatcher(任务调度层)
  3. 任务最终交由 CoroutineDispatcher 执行。最终脱离不开线程级别的任务调度。
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,793评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,567评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,342评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,825评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,814评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,680评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,033评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,687评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 42,175评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,668评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,775评论 1 332
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,419评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,020评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,978评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,206评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,092评论 2 351
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,510评论 2 343

推荐阅读更多精彩内容