深究Kotlin协程delay函数源码实现

前言

在开发项目期间 Kotlin 协程是经常使用的异步&并发编程框架。在协程使用过程中,时常会用到挂起函数,而 delay 就是一个挂起函数,在很多业务场景中会使用到,本文通过源码分析了解其背后的实现原理

分析

举个🌰
通常我们的业务场景有这种情况,需要延时执行某些任务

private suspend fun test() {
    findViewLifecycleOwner()?.lifecycleScope?.launch() {
        print("before delay")
        //延时3s
        delay(3000)
        print("after delay")
    }
}

delay 函数则会挂起当前协程,并且会在3s后进行恢复
再来看看 delay 函数的实现

/**
 * Delays coroutine for a given time without blocking a thread and resumes it after a specified time.
 *
 */
public suspend fun delay(timeMillis: Long) {
    if (timeMillis <= 0) return // don't delay
    return suspendCancellableCoroutine sc@ { cont: CancellableContinuation<Unit> ->
        // if timeMillis == Long.MAX_VALUE then just wait forever like  awaitCancellation, don't schedule.
        if (timeMillis < Long.MAX_VALUE) {
            cont.context.delay.scheduleResumeAfterDelay(timeMillis, cont)
        }
    }
}

suspendCancellableCoroutine

我们先来看看 suspendCancellableCoroutine 函数,它是 Kotlin 协程库中的一个函数,它是一个挂起函数,用于创建一个可以被取消的挂起点。
这个函数也是协程中经常使用,它可以将异步的回调用同步的方式表达出来,减少回调嵌套

public suspend inline fun <T> suspendCancellableCoroutine(
    crossinline block: (CancellableContinuation<T>) -> Unit
): T =
    suspendCoroutineUninterceptedOrReturn { uCont ->
        val cancellable = CancellableContinuationImpl(uCont.intercepted(), resumeMode = MODE_CANCELLABLE)
        /*
         * For non-atomic cancellation we setup parent-child relationship immediately
         * in case when `block` blocks the current thread (e.g. Rx2 with trampoline scheduler), but
         * properly supports cancellation.
         */
        cancellable.initCancellability()
        block(cancellable)
        cancellable.getResult()
    }

然而 suspendCancellableCoroutine 函数内部是使用了suspendCoroutineUninterceptedOrReturn 函数实现的,而且也是一个挂起函数

public suspend inline fun <T> suspendCoroutineUninterceptedOrReturn(crossinline block: (Continuation<T>) -> Any?): T {
    contract { callsInPlace(block, InvocationKind.EXACTLY_ONCE) }
    throw NotImplementedError("Implementation of suspendCoroutineUninterceptedOrReturn is intrinsic")
}

suspendCoroutineUninterceptedOrReturn 函数没有实际内容,是因为它是编译器内建函数,它是由 Kotlin 编译器来实现的。主要作用是传递协程上下文,以及判断是否挂起的,或者直接返回结果
再举个🌰
写个👇方法,然后进行反编译看看

private suspend fun test2(){
    suspendCancellableCoroutine<String> {
        print("suspendCancellableCoroutine test")
    }
}

反编译后

// 传递了 Continuation 上下文 
private final Object test2(Continuation $completion) {
   int $i$f$suspendCancellableCoroutine = false;
   int var4 = false;
   //封装实体
   CancellableContinuationImpl cancellable$iv = new CancellableContinuationImpl(IntrinsicsKt.intercepted($completion), 1);
   cancellable$iv.initCancellability();
   CancellableContinuation it = (CancellableContinuation)cancellable$iv;
   int var7 = false;
   //执行block代码
   String var8 = "suspendCancellableCoroutine test";
   System.out.print(var8);
   //获取协程体返回值
   Object var10000 = cancellable$iv.getResult();
   if (var10000 == IntrinsicsKt.getCOROUTINE_SUSPENDED()) {
      DebugProbesKt.probeCoroutineSuspended($completion);
   }
   //判断是否是挂起,是的话返回挂起状态
   return var10000 == IntrinsicsKt.getCOROUTINE_SUSPENDED() ? var10000 : Unit.INSTANCE;
}

正如上面所说的,suspendCoroutineUninterceptedOrReturn 函数是新增了传递协程上下文,以及判断是否挂起的,或者直接返回结果的逻辑
知道了 suspendCancellableCoroutine 函数的作用后,再回过头看 delay 函数

/**
 * Delays coroutine for a given time without blocking a thread and resumes it after a specified time.
 *
 */
public suspend fun delay(timeMillis: Long) {
    if (timeMillis <= 0) return // don't delay
    return suspendCancellableCoroutine sc@ { cont: CancellableContinuation<Unit> ->
        // if timeMillis == Long.MAX_VALUE then just wait forever like  awaitCancellation, don't schedule.
        if (timeMillis < Long.MAX_VALUE) {
            cont.context.delay.scheduleResumeAfterDelay(timeMillis, cont)
        }
    }
}

suspendCancellableCoroutine 函数传递了封装后的协程对象 cont,以及判断 cont 是否会执行挂起

DefaultExecutor

看看 cont.context.delay是啥

//获取delay
internal val CoroutineContext.delay: Delay get() = get(ContinuationInterceptor) as? Delay ?: DefaultDelay

//默认实现
internal actual val DefaultDelay: Delay = DefaultExecutor

@Suppress("PLATFORM_CLASS_MAPPED_TO_KOTLIN")
internal actual object DefaultExecutor : EventLoopImplBase(), Runnable {

 init {
    incrementUseCount() // this event loop is never completed
 }
    ... 省略很多代码 ...
}

可以看见 cont.context.delay 最终的实现是 DefaultExecutor ,它继承了EventLoopImplBase 和 Runnable
DefaultExecutor 是个单例,里边开启了线程,并且检测队列里任务的情况来决定是否需要挂起线程进行等待
而 scheduleResumeAfterDelay 函数是 EventLoopImplBase 里实现的

public override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
    val timeNanos = delayToNanos(timeMillis)
    if (timeNanos < MAX_DELAY_NS) {
        val now = nanoTime()
        DelayedResumeTask(now + timeNanos, continuation).also { task ->
            continuation.disposeOnCancellation(task)
            schedule(now, task)
        }
    }
}

如果满足时间条件,则创建一个延迟的task,DelayedResumeTask

入队列

看 schedule 函数

public fun schedule(now: Long, delayedTask: DelayedTask) {
    when (scheduleImpl(now, delayedTask)) {
        //unpark() 会启动线程
        SCHEDULE_OK -> if (shouldUnpark(delayedTask)) unpark()
        SCHEDULE_COMPLETED -> reschedule(now, delayedTask)
        SCHEDULE_DISPOSED -> {} // do nothing -- task was already disposed
        else -> error("unexpected result")
    }
}

private fun scheduleImpl(now: Long, delayedTask: DelayedTask): Int {
    if (isCompleted) return SCHEDULE_COMPLETED
    val delayedQueue = _delayed.value ?: run {
        _delayed.compareAndSet(null, DelayedTaskQueue(now))
        _delayed.value!!
    }
    //入队列
    return delayedTask.scheduleTask(now, delayedQueue, this)
}

实际就是把 DelayedResumeTask 放进 _delayed 队列里面,并且启动 DefaultExecutor 里的线程(如果没有开启的话)
_delayed 是存储延迟执行 task 的队列
还有一个 _queue 是是存储正常任务的队列

internal abstract class EventLoopImplBase: EventLoopImplPlatform(), Delay {
    // null | CLOSED_EMPTY | task | Queue<Runnable>
    private val _queue = atomic<Any?>(null)

    // Allocated only only once
    private val _delayed = atomic<DelayedTaskQueue?>(null)
    ...
    }

既然上面有入队列,就有出队列

出队列

回到刚刚说的启动了 DefaultExecutor 里的线程,看看它的 run 方法

override fun run() {
    ThreadLocalEventLoop.setEventLoop(this)
    registerTimeLoopThread()
    try {
        var shutdownNanos = Long.MAX_VALUE
        if (!notifyStartup()) return
        while (true) {
            Thread.interrupted() // just reset interruption flag
            //取队列核心代码
            var parkNanos = processNextEvent()
            if (parkNanos == Long.MAX_VALUE) {
                // nothing to do, initialize shutdown timeout
                val now = nanoTime()
                if (shutdownNanos == Long.MAX_VALUE) shutdownNanos = now + KEEP_ALIVE_NANOS
                val tillShutdown = shutdownNanos - now
                if (tillShutdown <= 0) return // shut thread down
                parkNanos = parkNanos.coerceAtMost(tillShutdown)
            } else
                shutdownNanos = Long.MAX_VALUE
                //如果返回的时间大于0
            if (parkNanos > 0) {
                // check if shutdown was requested and bail out in this case
                if (isShutdownRequested) return
                //挂起线程一段时间
                parkNanos(this, parkNanos)
            }
        }
    } finally {
        _thread = null // this thread is dead
        acknowledgeShutdownIfNeeded()
        unregisterTimeLoopThread()
        // recheck if queues are empty after _thread reference was set to null (!!!)
        if (!isEmpty) thread // recreate thread if it is needed
    }
}

取出队列里的 task 逻辑是在 processNextEvent() 函数中,看看它的实现

override fun processNextEvent(): Long {
    // unconfined events take priority
    if (processUnconfinedEvent()) return 0
    // queue all delayed tasks that are due to be executed
    val delayed = _delayed.value
    if (delayed != null && !delayed.isEmpty) {
        val now = nanoTime()
        while (true) {
            // make sure that moving from delayed to queue removes from delayed only after it is added to queue
            // to make sure that 'isEmpty' and `nextTime` that check both of them
            // do not transiently report that both delayed and queue are empty during move
            delayed.removeFirstIf {
                if (it.timeToExecute(now)) {
                // 加入正常任务队列
                    enqueueImpl(it)
                } else
                    false
            } ?: break // quit loop when nothing more to remove or enqueueImpl returns false on "isComplete"
        }
    }
    // then process one event from queue
    val task = dequeue()
    if (task != null) {
        task.run()
        return 0
    }
    //返回线程需要挂起的时间
    return nextTime
}

protected override val nextTime: Long
    get() {
        if (super.nextTime == 0L) return 0L
        val queue = _queue.value
        when {
            queue === null -> {} // empty queue -- proceed
            queue is Queue<*> -> if (!queue.isEmpty) return 0 // non-empty queue
            queue === CLOSED_EMPTY -> return Long.MAX_VALUE // no more events -- closed
            else -> return 0 // non-empty queue
        }
        val nextDelayedTask = _delayed.value?.peek() ?: return Long.MAX_VALUE
        return (nextDelayedTask.nanoTime - nanoTime()).coerceAtLeast(0)

其中流程是:

  1. 有个死循环一直在从 _delayed 队列里取延迟 task,如果判断延迟时间已经到了才会加入正常任务队列里且移除
  2. 直到取不出延迟 task 了才跳出循环
  3. 然后从正常队列里取出任务进行执行
  4. 执行任务就是在执行 DelayedResumeTask 类里的 run 方法
   private inner class DelayedResumeTask(
    nanoTime: Long,
    private val cont: CancellableContinuation<Unit>
) : DelayedTask(nanoTime) {
    override fun run() { with(cont) { resumeUndispatched(Unit) } }
    override fun toString(): String = super.toString() + cont.toString()
}
  1. 可以是看见调用了 resumeUndispatched() 函数,使用协程能经常看见或者使用 resumexxx 函数,它就是协程中的恢复,对应于挂起,它们理应是成对出现的
  2. resumeUndispatched() 函数最终的实现就是一开始封装 Continuation 协程为 CancellableContinuationImpl 的实现,即是 cont 对象,最终恢复了协程的运行
//恢复协程
override fun CoroutineDispatcher.resumeUndispatched(value: T) {
    val dc = delegate as? DispatchedContinuation
    resumeImpl(value, if (dc?.dispatcher === this) MODE_UNDISPATCHED else resumeMode)
}

processNextEvent() 函数的返回值

  • 最后返回下一个 nextDelayedTask 的延迟时间,即是线程挂起时间
  • 如果返回的时间大于0,则最终会调用 LockSupport.parkNanos(),将线程挂起一段时间,直到延迟时间结束。
    源码看到这里就能知道 delay 函数的背后运行原理了,最终是由 DefaultExecutor 线程整体控制挂起和恢复,而且不会阻塞当前调用方的线程。

总结

delay 函数的总体流程是

  1. 创建一个延迟任务(DelayedResumeTask),它包含需要延迟执行的逻辑和执行时间信息。
  2. 将延迟任务添加到延迟队列(_delayed)中。
  3. 在等待延迟任务执行之前,协程会被暂停(挂起)。
  4. 有一个单独的 DefaultExecutor 线程定期检查延迟队列中的任务,如果任务的延迟时间到了,就会将任务从延迟队列中移除,并将其放入执行队列中。
  5. 执行队列中取出任务,并调用其执行逻辑。
  6. 执行任务的过程也就是协程的恢复过程,一旦任务开始执行,协程会被恢复,继续执行其后续的逻辑。
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,723评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,003评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,512评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,825评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,874评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,841评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,812评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,582评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,033评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,309评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,450评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,158评论 5 341
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,789评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,409评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,609评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,440评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,357评论 2 352

推荐阅读更多精彩内容