理解kotlin协程基础特性

前言

为什么要写这篇文章?在开发Kotlin或Compose时,我们无法避免与协程打交道,例如常用的LaunchedEffect、网络请求、磁盘数据读取等等。在LaunchedEffect的官方文档中,都离不开一个词——“协程”。那么协程究竟是什么呢?我们有必要仔细了解一番!

一、 协程是什么?

kotlin官方是这样来描述协程的:
A coroutine is an instance of a suspendable computation. It is conceptually similar to a thread, in the sense that it takes a block of code to run that works concurrently with the rest of the code. However, a coroutine is not bound to any particular thread. It may suspend its execution in one thread and resume in another one.
这段话中有几个关键信息:

  • 可挂起暂停的;
  • 和线程执行作业比较类似,它可以执行一段代码,且支持和其他代码块并行执行;
  • 协程不绑定线程,在一个生命周期内可以在A线程执行也可以在B线程执行;

看到这个信息,协程和线程那么相似,为什么还要弄一个协程出来增加理解和使用成本呢?Kotlin给出的理由是“Coroutines are less resource-intensive than JVM threads”,即协程比线程占用更少的资源,并且在线程资源耗尽时,可以继续使用协程。

二、协程简单使用

在了解到协程的基本特征之后,我们来看看如何简单的使用协程(非compose场景)(官方例子):

fun main() {
    println("1")
    submain()
    println("2")
}
fun submain() = runBlocking {
    doWorld()
    println("Done")
}

suspend fun doWorld() = coroutineScope { // this: CoroutineScope
    launch {
        delay(2000L)
        println("World 2")
    }
    launch {
        delay(1000L)
        println("World 1")
    }
    println("Hello")
}

对应的输出:

1
Hello
World 1
World 2
Done
2

我们逐个来看看上面代码中提到的关键字,看看使用它们的上层场景是什么?后面我们再去分析他们的实现源码。

首先是runBlocking,它会起一个新的协程,但是会阻塞当前线程的执行直到协程中的任务执行完成,我们从输出的内容也能看到1和2之间是包含了runBlocking里面的任务输出的。

其次是suspend修饰符,被suspend修饰的函数在作为调用者时可以调用其他suspend修饰的函数(调用方式和普通函数没有区别);作为被调用者时可以在协程环境下被调用,不能在非协程环境和suspend环境下调用。如果调用会出现如下错误:

Suspend function 'doWorld' should be called only from a coroutine or another suspend function

接着看一下coroutineScope,从字面意思也能看出来它会创建一个协程的环境(在这个环境内我们可以调用suspend函数和其他协程)。kotlin给出了一个针对该scope的一个关键信息:

coroutineScope内的任何子任务失败都会导致scope整体失败,也就会导致scope内其他的子任务失败

我们上面看到runBlocking会起一个协程,coroutineScope也会起一个协程。他们之间的区别是什么?runBlocking会将当前线程阻塞,也就是这个线程什么事儿也做不了了。而coroutineScope只是当前任务挂起,线程是可以继续执行其他任务的。

最后是launch,它会构建一个协程,但是并不会阻塞当前的程序执行流。可以将它理解为起了一个异步的任务。我们从上面的输出能看出来Hello是早于World 1/2的。

看完了基础实现使用(后面会介绍一些常用的用法),我们下面来仔细分析一下协程的定义上面提出的特性。

三、 可挂起特性

挂起是Kotlin协程中的核心概念,它允许我们在不阻塞线程的情况下挂起函数的执行,并在适当的时候恢复执行。因此这里有两个关键的能力:

    1. 挂起
    1. 恢复

我们分别这两个特性来看一下具体的实现

3.1 挂起

仅从个人理解出发明确一下“挂起”的概念,指定的任务在特定的程序执行流(还没有接触协程,可以简单把这里理解为虚拟线程)中运行:

  • 挂起,指的是任务暂定执行,而程序执行流本身并没有停止。主体是任务:

  • 阻塞,指的是执行任务的执行流被暂停或者停止,自然任务本身也无法继续执行(除非派发到其他程序执行流)。主体是执行流或者线程:

理解这两个的区别很重要!挂起并不会导致线程或者程序执行流无法处理任务,而是任务本身没有继续执行而已,阻塞会导致程序执行流被中断,而无法接收其他任务。

好了下面我们从源码的角度来看看。来看个例子(来源于Kotlin language specification,我这边稍微做了一下修改,去掉了await()):

suspend fun coroutineTestMain() {
    val a = a()
    val y = foo(a) // suspension point #1
    b()
    val z = bar(a, y) // suspension point #2
    c(z)
}

当然,上面的代码需要放在suspend函数或者协程作用域下才能调用。面对这段代码,它究竟做了些什么可以让串行的执行流能够挂起呢?几乎无从下手,能想到的就是编译器在编译过程中进行了相关的处理。

这时候有两个思路,首先是查看Kotlin前端生成的FIR,看看里面有什么线索(这需要编写Kotlin的编译插件)。

比如有如下代码(不用上面的代码做例子,是因为真的没有什么变化):

suspend fun testSuspendFunction() = coroutineScope {
    launch {
        delay(1000)
    }
}

生成的IR结果:

suspend fun testSuspendFunction(): Job {
    return coroutineScope {
        $this$coroutineScope.launch {
            delay(1000L)
        }
    }
}

很显然suspend没啥变化,差异只是coroutineScope的语法糖。

我们尝试使用第二种方案:直接反编译class文件。反编译之后得到的内容差异很大,这个内容的差异很大一部分是来自kotlin编译器的CPS转换,所以下面我们先了解一下CPS

3.1.1 Continuation Passing Style (CPS)

每一个suspend函数都会经过编译器的CPS转换,CPS可以理解为一个编译器的编译时转换。比如有一个suspend函数定义:

suspend fun fucntion_name(p1:Int, p2:Int, p3:Int ... pn:Int): T

它有个p1, p2, ... pn共n个参数,以及返回值类型为T。经过CPS转换后会存在n+1个参数,新增参数类型为kotlin.coroutines.Continuation<T>(这里的T就是原函数的返回值类型T),且返回值变为kotlin.Any?:

suspend fun fucntion_name(p1:Int, p2:Int, p3:Int ... pn:Int, continuation: Continuation<T>): Any

这里为什么是返回Any而原返回值类型呢?这是因为suspend函数不仅可以返回该函数对应返回类型的数据,还可以返回一个COROUTINE_SUSPENDED常量:

internal enum class CoroutineSingletons { COROUTINE_SUSPENDED, UNDECIDED, RESUMED }
public val COROUTINE_SUSPENDED: Any get() = CoroutineSingletons.COROUTINE_SUSPENDED

注意:

返回COROUTINE_SUSPENDED时表示该函数将会被挂起

kotlin明确指出我们是不能在suspendCoroutineUninterceptedOrReturn调用之外使用COROUTINE_SUSPEND变量,这可能会触发意想不到的结果:

suspendCoroutineUninterceptedOrReturn<Any>{
    return@suspendCoroutineUninterceptedOrReturn COROUTINE_SUSPENDED
}

我们常用的suspendCancellableCoroutine就是基于它实现的,并增加了cancel的能力。

回到coroutineTestMain例子,来看看经过CPS转换之后的变化(Java表示,反编译之后内容过多我这里做了一下删减)。首先是:

public static final Object coroutineTestMain(@NotNull Continuation<? super Unit> continuation) {
    if ((continuation instanceof TestCoroutineKt$coroutineTestMain$1) == false) goto L7;
    TestCoroutineKt$coroutineTestMain$1 testCoroutineKt$coroutineTestMain$1 = (TestCoroutineKt$coroutineTestMain$1) continuation;
    if ((testCoroutineKt$coroutineTestMain$1.label & Integer.MIN_VALUE) == 0) goto L7;
    testCoroutineKt$coroutineTestMain$1.label -= Integer.MIN_VALUE;
L7:
    testCoroutineKt$coroutineTestMain$1 = new TestCoroutineKt$coroutineTestMain$1(continuation);
    goto L8
L8:
    Object $result = testCoroutineKt$coroutineTestMain$1.result;
    Object coroutine_suspended = IntrinsicsKt.getCOROUTINE_SUSPENDED();
    switch(testCoroutineKt$coroutineTestMain$1.label) {
        case 0: goto L10;
        case 1: goto L14;
        case 2: goto L19;
        default: goto L23;
    };
...
L18:
    return coroutine_suspended;
...
}

可以看到coroutineTestMain新增了continuation参数且返回值类型变为了Object。从代码逻辑来看,它首先会判断continuation是否是"TestCoroutineKtcoroutineTestMain1"类的实例:如果不是会跳转到L7初始化对应的实例;

如果是的话会更新label的值(如果最高位为0则认为是异常状态,则跳转到L7重新生成实例。否则将最高位的1移除,通过减去MIN_VALUE实现)。


很明显第一次进入该函数会命中第一个条件判断,并跳转到L7去执行初始化操作。

"TestCoroutineKt$coroutineTestMain$1"类我们并没有声明,来看一下编译器生成的类是什么样的:

final class TestCoroutineKt$coroutineTestMain$1 extends ContinuationImpl {
    int I$0;
    /* synthetic */ Object result;
    int label;

    TestCoroutineKt$coroutineTestMain$1(Continuation<? super TestCoroutineKt$coroutineTestMain$1> continuation) {
        super(continuation);
    }

    @Override // kotlin.coroutines.jvm.internal.BaseContinuationImpl
    @Nullable
    public final Object invokeSuspend(@NotNull Object $result) {
        this.result = $result;
        this.label |= Integer.MIN_VALUE;
        return TestCoroutineKt.coroutineTestMain(this);
    }
}

该类继承自ContinuationImpl(最终都是继承自抽象基类BaseContinuationImp),它包含有label和result属性(在coroutineTestMain函数里面使用到了这两个属性),并且重写了invokeSuspend方法。在该方法内部调用了上面我们定义的coroutineTestMain方法(此时传入this作为continuation的实参):

suspend fun coroutineTestMain() {
    val a = a()
    val y = foo(a) // suspension point #1
    b()
    val z = bar(a, y) // suspension point #2
    c(z)
}

这个变化kotlin的Asynchronous programming with coroutines里面也有提到:

Each suspendable lambda is compiled to a continuation class, with fields representing its local variables, and an integer field for current state in the state machine

因此截止目前我们大概能看到CPS转换做的事情:

    1. suspend函数参数会默认增加一个Continuation参数;
    1. suspend函数的返回值修改为Any,支持返回函数声明时类型的返回值,也支持返回COROUTINE_SUSPENDED常量;
    1. 每一个suspend函数都会生成一个继承自BaseContinuationImp的子类,并且含有label属性和invokeSuspend方法;
    1. 反编译之后内部存在基于label的switch的跳转逻辑;

继续看一下上面反编译之后的coroutineTestMain函数,L8分支的switch case语句基于label跳转到不同的执行分支。那他们的作用是什么呢?实际上他们就是实现挂起和恢复能力关键——状态机!

3.1.2 状态机

kotlin中Coroutine state machine有介绍状态机的内容,这里我还是以我们上的例子和反编译之后的代码来看一下状态机。简化coroutineTestMain函数只与状态机有关的代码:

public static final Object coroutineTestMain(@NotNull Continuation<? super Unit> continuation) {
    Object $result = testCoroutineKt$coroutineTestMain$1.result;
    Object coroutine_suspended = IntrinsicsKt.getCOROUTINE_SUSPENDED();
    switch(testCoroutineKt$coroutineTestMain$1.label) {
        case 0: goto L34;
        case 1: goto L38;
        case 2: goto L43;
        default: goto L47;
    };
L34: 
    ResultKt.throwOnFailure($result);
    int a = a();
    testCoroutineKt$coroutineTestMain$1.I$0 = a;
    testCoroutineKt$coroutineTestMain$1.label = 1;
    Object obj = foo(a, testCoroutineKt$coroutineTestMain$1);
    if (obj == coroutine_suspended) goto L37;
L39: 
    int y = ((Number) obj).intValue();
    b();
    testCoroutineKt$coroutineTestMain$1.label = 2;
    Object obj2 = bar(a, y, testCoroutineKt$coroutineTestMain$1);
    if (obj2 == coroutine_suspended) goto L42;
L44: ---
    int z = ((Number) obj2).intValue();
    c(z);
    return Unit.INSTANCE;
L42: 
    return coroutine_suspended;
L37: 
    return coroutine_suspended;
L38: 
    a = testCoroutineKt$coroutineTestMain$1.I$0;
    ResultKt.throwOnFailure($result);
    obj = $result;
    goto L39
L43: 
    ResultKt.throwOnFailure($result);
    obj2 = $result;
    goto L44
L47: 
    throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}

从上面的代码可以看到根据不同的label值执行不同的goto跳转,并且在对应的分支内也会更新label的值(这里先不讲为什么会重复调用coroutineTestMain函数,下面将“恢复”会详细解释)。我们根据代码逻辑可以简单画一个状态转换图:



关于状态机这块儿,还有个问题:比如上面的L39跳转到L42已经返回了COROUTINE_SUSPENDED,那为什么还能继续执行其他分支呢?这个就和恢复能力有关!

3.2 恢复

我们还是以coroutineTestMain函数为例,经过CPS转换之后coroutineTestMain函数会生成一个继承自BaseContinuationImpl的类:

// ContinuationImpl继承自BaseContinuationImpl
public final class TestCoroutineKt$coroutineTestMain$1 extends ContinuationImpl {
    int I$0;
    /* synthetic */ Object result;
    int label;

    TestCoroutineKt$coroutineTestMain$1(Continuation<? super TestCoroutineKt$coroutineTestMain$1> continuation) {
        super(continuation);
    }

    @Override // kotlin.coroutines.jvm.internal.BaseContinuationImpl
    @Nullable
    public final Object invokeSuspend(@NotNull Object $result) {
        this.result = $result;
        this.label |= Integer.MIN_VALUE;
        return TestCoroutineKt.coroutineTestMain(this);
    }
}

BaseContinuationImpl类实现了resumeWith方法,我们看看resumeWith方法的实现:

public final override fun resumeWith(result: Result<Any?>) {
    ...
    val outcome: Result<Any?> =
        try {
            val outcome = invokeSuspend(param)
            if (outcome === COROUTINE_SUSPENDED) return
            Result.success(outcome)
        } catch (exception: Throwable) {
            Result.failure(exception)
        }
    ...
}

而invokeSuspend会再一次调用TestCoroutineKt.coroutineTestMain方法并传入了this,因此我们得以重复执行coroutineTestMain方法。上图中从Suspend到Lx的过程调用了resume,也就是恢复了。

3.2.1 Continuation

那如何处理的连续suspend函数调用并继续恢复执行呢?kotlin实现挂起点继续执行的关键就是Continuation接口:

public interface Continuation<in T> {
    public val context: CoroutineContext
    public fun resumeWith(result: Result<T>)
}

context是协程执行的上下文,包括Dispachter、继承的Job以及异常处理。resumeWith方法用于协程的恢复处理,BaseContinuationImpl处理栈结构的函数调用的恢复处理。

看个例子:

suspend fun foo2(a: Int): Int {
    return suspendCoroutineUninterceptedOrReturn<Int> {
        continuation = it
        Thread.sleep(2000)
        it.resume(a + 1)
        return@suspendCoroutineUninterceptedOrReturn COROUTINE_SUSPENDED
    }
}

suspend fun foo1(a: Int): Int {
    return foo2(a+ 2)
}

suspend fun foo(a: Int): Int {
    val result = foo1(a + 1)
    a() // 非suspend函数
    return result
}

经过CPS转换之后这里会生成2个continuation类,foo2由于是直接调用的suspendCoroutineUninterceptedOrReturn且在函数内部直接执行了resume,没有需要继续执行的内容,因此这里并不会生成一个continuation。这两个类分别是(这里的是我写代码的文件TestCoroutine.Kt):

  • TestCoroutineKt$foo1$1:由于这里是直接调用了foo2函数,并没有其他的执行动作因此它并不一定会生成对应的类。经过测试,如果在该函数内部执行其他操作(比如用一个局部变量保留foo2的值,并返回这个局部变量)相对复杂的或者调用其他的挂起函数,这个类是会生成的;
  • TestCoroutineKt$foo$1;

他们的相互调用会形成一个类似函数调用的栈结构:


摘出来对应的生成实例的代码:

Object obj = foo1(a + 1, testCoroutineKt$foo$1);
public static final Object foo1(int a, @NotNull Continuation<? super Integer> continuation2) {
    ...
    testCoroutineKt$foo1$1 = new TestCoroutineKt$foo1$1(continuation2);
}
public abstract class BaseContinuationImpl implements Continuation<Object>, CoroutineStackFrame,Serializable {
    ...
    public BaseContinuationImpl(@Nullable Continuation<Object> continuation) {
        this.completion = continuation;
    }
}

然后在BaseContinuationImpl的resumeWith方法内部使用了while-true的方式来调用每一个completion的invokeSuspend方法:

public final void resumeWith(@NotNull Object result) {
        Object m9663constructorimpl;
        Object outcome;
        Object current = this;
        Object obj = result;
        while (true) {
            Object param = obj;
            DebugProbesKt.probeCoroutineResumed((Continuation) current);
            BaseContinuationImpl $this$resumeWith_u24lambda_u240 = (BaseContinuationImpl) current;
            Continuation completion = $this$resumeWith_u24lambda_u240.completion;
            Intrinsics.checkNotNull(completion);
            try {
                outcome = $this$resumeWith_u24lambda_u240.invokeSuspend(param);
            } catch (Throwable exception) {
                Result.Companion companion = Result.Companion;
                m9663constructorimpl = Result.m9663constructorimpl(ResultKt.createFailure(exception));
            }
            if (outcome == IntrinsicsKt.getCOROUTINE_SUSPENDED()) {
                return;
            }
            Result.Companion companion2 = Result.Companion;
            m9663constructorimpl = Result.m9663constructorimpl(outcome);
            Object outcome2 = m9663constructorimpl;
            $this$resumeWith_u24lambda_u240.releaseIntercepted();
            if (completion instanceof BaseContinuationImpl) {
                current = completion;
                obj = outcome2;
            } else {
                completion.resumeWith(outcome2);
                return;
            }
        }
    }

可以看最后的current的赋值操作,我们从图中可以知道completion是指向调用者生成continuation实例(比如在foo1中,其completion指向的foo的continuation实例)。这样在while循环中,就能触发foo的invokeSuspend方法的调用,继而调用真实的suspend函数。实现了完整的恢复流程调用!

四、 协程与线程的关系

我们在文章的开头已经提到过,协程与线程是存在联系的。通过kotlin官方文档我们知道Coroutines是在CoroutineContext的上下文中执行,所以这个“联系”是通过CoroutineContext上下文关联起来的:

CoroutineContext是通常情况下包含有Job,dispatcher以及异常处理的集合(可以理解为表征协程的信息集合)。我们要了解和线程的联系只需要看Context中的Dispatcher:

// public actual val Default: CoroutineDispatcher = createDefaultDispatcher()
val defaultDiispatcher = Dispatchers.Default
launch(defaultDiispatcher) {
    delay(1000)
}

4.1 dispatcher和线程的关系

dispatcher用来指定在哪个线程中派发协程的任务,任务可以限制在指定的线程中执行,也可以在线程池中进行调度。
kotlin中预定义了4种Dispachter:

  • Dispatchers.Default:当我们在调用launch或者其他开启协程没有指定Dispachter时,默认会使用Default。所有的任务会共享一个公共的后台线程池,适用做计算密集型的事务;
  • Dispatchers.IO:在需要的时候创建共享线程池,用于执行IO密集型事务;
  • Dispatchers.Main:对应主线程(常用更新UI);
  • Dispatchers.Unconfined:无限制,kotlin在明确要求我们在开发业务代码时不应使用这个Dispachter。

上述的这些Dispachter定义了协程的任务应该在什么线程中执行,所以我们来具体看看这几个Dispachter的实现是什么样的。

4.2 Default Dispachter实现

我们分别来看一下在类Darwin、JVM和其他平台的实现。

4.2.1 类Darwin Native

类Darwin平台出现在Apple的操作系统中,比如macOS、iOS和watchOS等等。

首先来看Default在Darwin下的实现:

override fun dispatch(context: CoroutineContext, block: Runnable) {
    autoreleasepool {
        dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT.convert(), 0u)) {
            block.run()
        }
    }
}

通过调用GCD中的dispatch_get_global_queue全局队列来实现的Default。相信iOS开发同学对这个是十分熟悉的,而对于其他平台的同学可以将dispatch_get_global_queue理解为一个全局线程池里面寻找一个空闲可用的并发任务队列,如果不存在则会新建一个队列。

4.2.2 JVM

在JVM下面Default是基于SchedulerCoroutineDispatcher子类的实例:

internal open class SchedulerCoroutineDispatcher() : ExecutorCoroutineDispatcher() {
    private var coroutineScheduler = createScheduler()
    override fun dispatch(context: CoroutineContext, block: Runnable): Unit = coroutineScheduler.dispatch(block)
    override fun close() {
        coroutineScheduler.close()
    }
    @Synchronized
    internal fun shutdown(timeout: Long) {
        coroutineScheduler.shutdown(timeout)
    }
}

真正执行dispatch操作的是CoroutineScheduler实例,在其内部也定义了一个Worker:

private fun currentWorker(): Worker? = (Thread.currentThread() as? Worker)?.takeIf { it.scheduler == this }

Worker在JVM下面实际上就是Thread类的子类:

internal inner class Worker private () : Thread() {
    val localQueue: WorkQueue = WorkQueue()
    var nextParkedWorker: Any? = NOT_IN_STACK
}

在Worker的内部维护了一个WorkerQueue,这个WorkQueue是什么呢?

4.2.2.1 WorkerQueue

它是一个半FIFO的队列用于存储woker需要处理的task,可以按照正常的先进先出执行队列中的任务,同时也可以提高某一个任务的优先级让其不遵循FIFO的排队方式:

internal class WorkQueue {
    // 常规队列
    private val buffer: AtomicReferenceArray<Task?> = AtomicReferenceArray(BUFFER_CAPACITY)
    private val producerIndex = atomic(0)
    private val consumerIndex = atomic(0)
    // 特殊调度任务
    private val lastScheduledTask = atomic<Task?>(null)
}

贴一个简单的示意图:


producerIndex为生产者下标,consumerIndex为消费者下标,这两个值只会增加并不会减少,也就是最多保有128个任务,其中内部针对是否是blocking任务(比如IO密集型的任务)做了区分,以便执行部分操作。

在上图中我们看到了一个“特殊调度任务”,没错就是一个(不能再多了)。我们一般会优先处理这个调度任务(比如前面提到的某哦一个高优先级任务)。

WorkerQueue对外支持的操作有:



因此我们可以把它理解为一个包含不公平处理(特殊任务)的任务队列即可,支持添加任务和获取任务。

trySteal这里需要简单说一下(后面将Worker获取需要执行的任务时会用到这个时间),如果根据stealingMode没有在常规队列中找到合适的任务,那么会去特殊任务队列中窃取,当然这个窃取当然也要满足Mode是否匹配:

// 完整代码在WorkerQueue的trySteal方法内
val time = schedulerTimeSource.nanoTime()
// 查看提交和当前时间的差值
val staleness = time - lastScheduled.submissionTime
// WORK_STEALING_TIME_RESOLUTION_NS为100000纳秒
if (staleness < WORK_STEALING_TIME_RESOLUTION_NS) {
    return WORK_STEALING_TIME_RESOLUTION_NS - staleness
}

如果Mode匹配之后,会查看当前任务的提交时间和此时窃取时间差是否大于100000纳秒,如果不大于,则会返回调用者需要等到指定时间差值之后再来窃取。

4.2.2.2 Worker

现在来看看管理WorkerQueue的Worker,我们在前面也提到了Worker在JVM环境下是继承自Thread的子类,而且它是CoroutineScheduler的内部类,所以Woker会调用很多CoroutineScheduler的属性或者方法,当遇到了CoroutineScheduler的内容时我会简单带一句。

先说明一个前提,在CoroutineScheduler中维护了一个workers数组

它包含了已经创建的Woker列表。而Woker类本身有一个indexInArray属性用于标识其在wokers数组中的索引:

internal inner class Worker private constructor() : Thread() {
    @Volatile // volatile for push/pop operation into parkedWorkersStack
    var indexInArray = 0
        set(index) {
            name = "$schedulerName-worker-${if (index == 0) "TERMINATED" else index.toString()}"
            field = index
        }
    val localQueue: WorkQueue = WorkQueue()
}

如果该值为0表示对应的Woker已经被终止,那肯定要问了数组的下标不都是从0开始的吗?这里kotlin为此将wokers[0]作为一个哨兵对象(可以将其理解为一个boundary边界,iOS开发同学应该对于autoreleasepoolPage的boundary有点印象),将其值设置null。也就是说wokers的size最小为1。

接着来看一下woker的执行逻辑:

private fun runWorker() {
    var rescanned = false
    while (!isTerminated && state != WorkerState.TERMINATED) {
        val task = findTask(mayHaveLocalTasks)
        if (task != null) {
            rescanned = false
            minDelayUntilStealableTaskNs = 0L
            executeTask(task)
            continue
        } else {
            mayHaveLocalTasks = false
        }
        if (minDelayUntilStealableTaskNs != 0L) {
            if (!rescanned) {
                rescanned = true
            } else {
                rescanned = false
                tryReleaseCpu(WorkerState.PARKING)
                interrupted()
                LockSupport.parkNanos(minDelayUntilStealableTaskNs)
                minDelayUntilStealableTaskNs = 0L
            }
            continue
        }
        tryPark()
    }
    tryReleaseCpu(WorkerState.TERMINATED)
}

通过一个While循环来驱动,只有当Woker处于终止状态或者CoroutineScheduler处于终止状态时整个循环才会停止(isTerminated是CoroutineScheduler的属性,用于表示其是否终止)。

接着会调用findTask寻找合适的任务,它在获取到CPU的执行权限后去获取全局或者本地queue中的任务:

private fun findAnyTask(scanLocalQueue: Boolean): Task? {
    ...
    if (scanLocalQueue) {                
        val globalFirst = nextInt(2 * corePoolSize) == 0
        if (globalFirst) pollGlobalQueues()?.let { return it }
        localQueue.poll()?.let { return it }
        if (!globalFirst) pollGlobalQueues()?.let { return it}
    } else {
        pollGlobalQueues()?.let { return it }
    }
    ...
}

调用nextInt随机生成小于2*corePoolSize的值,在if语句里面调用了两次pollGlobalQueues和一次localQueue。这么做有50%的概率(是否等于0)能够随机取到特定的任务(无论是global还是local),这样能够有效避免线程饥饿导致某些任务永远无法执行(比如优先级反转或者资源分配不均之类的)。

如果没有找到任务,wokers数组里面找mode为STEAL_ANY的任务。我们在介绍WokerQueue的时候特意提到了trySteal里面窃取等待时间的逻辑:任务提交到此时窃取的时间差值小于100微秒是无法窃取的,该方法会返回一个需要等待的时间。提交时间在CoroutineScheduler创建任务时赋值(4.2.2.3节createTask源码时有提到submissionTime这个值):

fun trySteal(stealingMode: StealingMode, stolenTaskRef: ObjectRef<Task?>): Long {
    ...
    return tryStealLastScheduled(stealingMode, stolenTaskRef)
}
private fun tryStealLastScheduled(stealingMode: StealingMode, stolenTaskRef: ObjectRef<Task?>): Long {
    ...
    val time = schedulerTimeSource.nanoTime()
    val staleness = time - lastScheduled.submissionTime
    if (staleness < WORK_STEALING_TIME_RESOLUTION_NS) {
        return WORK_STEALING_TIME_RESOLUTION_NS - staleness
    }
    ...
}

而这里恰好用到了这个时间:

private fun trySteal(stealingMode: StealingMode): Task? {
    ...
    val stealResult = worker.localQueue.trySteal(stealingMode, stolenTask)
    ...
    minDelay = min(minDelay, stealResult)
    ...
    minDelayUntilStealableTaskNs = if (minDelay != Long.MAX_VALUE) minDelay else 0
    return null
}

minDelayUntilStealableTaskNs就是保留的下一次窃取任务需要等到的时间。

在查找任务阶段还会尝试去获取blocking任务(所谓的阻塞任务,我们可以把它理解为IO密集型任务,与之相对应的就是CPU密集型任务):

private fun findBlockingTask(): Task? {
    return localQueue.pollBlocking()
        ?: globalBlockingQueue.removeFirstOrNull()
        ?: trySteal(STEAL_BLOCKING_ONLY)
}

回到runWoker方法,我们调用findTask如果找到了对应的任务,会将rescanned和minDelayUntilStealableTaskNs重置,并执行对应的任务:

private fun executeTask(task: Task) {
    ...
    if (task.isBlocking) {
        if (tryReleaseCpu(WorkerState.BLOCKING)) {
            signalCpuWork()
        }
        runSafely(task)
        decrementBlockingTasks()
        val currentState = state
        if (currentState !== WorkerState.TERMINATED) {
            state = WorkerState.DORMANT
        }
    } else {
        runSafely(task)
    }
}

首先判断任务是否是阻塞任务(IO密集型,网络或者磁盘读取),如果是就完全没必要占用CPU资源了,它会尝试去释放CPU,将资源让出来给其他处于Parked状态的Woker执行任务。并且降低CoroutineScheduler中阻塞任务的数量,这个数量是维护在一个64位整型的第21位到第42位共21位长的bit中,关于这个我们后面介绍CoroutineScheduler时在详细介绍。

任务执行完成,会尝试将Woker的状态调整为Parking。

好吧,上面的流程是我们能够正常获取到需要执行的任务。那我们没有获取到需要执行的任务怎么办?这里就会用到前面得到的minDelayUntilStealableTaskNs变量:

private fun runWorker() {
    ...
    if (minDelayUntilStealableTaskNs != 0L) {
        if (!rescanned) {
            rescanned = true
        } else {
            rescanned = false
            tryReleaseCpu(WorkerState.PARKING)
            interrupted()
            LockSupport.parkNanos(minDelayUntilStealableTaskNs)
            minDelayUntilStealableTaskNs = 0L
        }
        continue
    }
    ...
}

如果我们在findTask的时候窃取任务失败,且给了我们的等待时间的话。查找失败的当次循环(rescanned为false)会将rescanned设置为true以进入下一次循环,在这次循环中会调用parkNanos等待执行时长接着执行循环继续查找任务。

最后在任务执行完成之后会调用release释放CPU,并且将状态更新为TERMINATED终止态。下面是大致的一个状态流转:

关于Woker还有其他操作,但是会涉及到Worker间的操作逻辑。所以我们下面详细看调度器的时候再进行补充。

4.2.2.3 调度器Scheduler

因此真正执行任务是交给了worker,而调度这些Worker怎么工作则是通过CoroutineScheduler实现的。

Scheduler内部一共有四种队列来保留派发过来的任务:

  1. 全局CPU密集型任务队列
  2. 全局IO密集型任务队列
  3. 局部CPU密集型任务队列(Worker)
  4. 局部IO密集型任务队列(Worker)

这里全局队列实现相对没有那么复杂,因此在具体调度操作用到时我们再介绍。着重来看看局部的Worker在Scheduler中是如何组织以及如何调度的。

Scheduler持有了一个私有的属性controlState用于存储当前已经添加的任务数量和已经创建的线程(这里对应Woker,我们上面有提到过JVM下面Woker是继承自Thread的子类)数量:


所以我们看到关于这些计数的方法实现或者属性的实现都是在操作对应的位:

private val createdWorkers: Int inline get() = (controlState.value and CREATED_MASK).toInt()
private val availableCpuPermits: Int inline get() = availableCpuPermits(controlState.value)
private inline fun incrementCreatedWorkers(): Int = createdWorkers(controlState.incrementAndGet())
private inline fun decrementCreatedWorkers(): Int = createdWorkers(controlState.getAndDecrement())
private inline fun incrementBlockingTasks() = controlState.addAndGet(1L shl BLOCKING_SHIFT)
private inline fun decrementBlockingTasks() {
    controlState.addAndGet(-(1L shl BLOCKING_SHIFT))
}

类似的Scheduler内部还有个parkedWorkersStack(parked状态可以理解为Worker处于空闲可随时被唤醒工作的状态)用来记录保留栈顶指针,这里官方也有解释为什么要用栈而不是队列,是由于栈是后进先出的策略,这比较符合局部性原理(时间或者空间上的局部性,空间局部性就是指的在内存或者高速缓存中相邻的数据更容易被访问到。时间上的局部可以理解为相隔较近的两个任务他们的关联性比较大,频繁调用的可能性更高)。

private val parkedWorkersStack = atomic(0L)

parkedWorkersStack的低21位用于记录栈顶的位置,它是一个WorkerQueue的数组下标。这里提到的数组就是Scheduler内部维护的一个变量:

@JvmField
val workers = ResizableAtomicArray<Worker>((corePoolSize + 1) * 2)

特别提示一下workers[0]是没有数据的,它是作为一个哨兵来使用(我们在介绍Worker的时候也有提到可以回去看看),也就是说真实数据的下标从1开始。


从上图我们可以看出来,栈结构的维护还依赖了一个nextParkedWorker指针,该指针是属于Worker内属性:

/**
 * Reference to the next worker in the [parkedWorkersStack].
 * It may be `null` if there is no next parked worker.
 * This reference is set to [NOT_IN_STACK] when worker is physically not in stack.
 */
@Volatile
var nextParkedWorker: Any? = NOT_IN_STACK

nextParkedWorker是我们不得不提到的一个能力,可以将它理解为一个指针,它记录了当前woker指向的下一个处于parked状态(一个有指定暂定时长的线程状态,调用park进入状态,调用unPark退出状态)的woker。这个值是由CoroutineScheduler类来维护的,因此我们先知道存在这个能力即可。

入栈的时候会首先将栈顶的Worker找到,并将即将入栈的Woker的nextParkedWorker设置为当前栈顶Worker:

fun parkedWorkersStackPush(worker: Worker, oldIndex: Int, newIndex: Int) {
    ...
    val index = (top and PARKED_INDEX_MASK).toInt()
    val updVersion = (top + PARKED_VERSION_INC) and PARKED_VERSION_MASK
    val updIndex = worker.indexInArray
    worker.nextParkedWorker = workers[index]
    ...
}

然后将栈顶指针指向最新的Worker:


上图中红色是我们新建的Worker,新建之后就会将其放入到Worker数组中,因此上面我们是直接通过indexInArray来读取的下标。

出栈的操作和入栈相反,会先将找到栈顶Worker以及它的nextParkerWorker。接着会将栈顶的指针指向nextParkerWorker:

private fun parkedWorkersStackPop(): Worker? {
    ...
    val index = (top and PARKED_INDEX_MASK).toInt()
    val worker = workers[index] ?: return null // stack is empty
    val updIndex = parkedWorkersStackNextIndex(worker)
    parkedWorkersStack.compareAndSet(top, updVersion or updIndex.toLong())
    ...
}

在继续后面的内容之前,我们需要有个前提:

任务是先派发后执行的!

来看看任务是如何进行派发的,我们可以看到dispatch方法:

fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, tailDispatch: Boolean = false)

传入的任务block是一个Runnable的实例,因此我们会先创建一个Task任务:

fun createTask(block: Runnable, taskContext: TaskContext): Task {
    val nanoTime = schedulerTimeSource.nanoTime()
    if (block is Task) {
        block.submissionTime = nanoTime
        block.taskContext = taskContext
        return block
    }
    return block.asTask(nanoTime, taskContext)
}

这里的submissionTime就是前面我们在4.2.2.2节获取Worker时用到的任务提交时间。

接着会查看当前任务是否是阻塞任务,如果是阻塞任务会增加阻塞任务的计数,并将该任务提交到当前线程Worker的局部队列。提交失败会将其提交到全局队列里面。

如果连全局队列都添加失败,会尝试性的唤醒栈顶Parked Worker,这里仅仅只是唤醒Worker并不是马上让其执行任务。如果栈顶没有Worker还会新建Worker用于接受任务的派发。

任务的执行是基于线程调度的,当线程处于就绪状态且被JVM调度时,run 方法将在这个新线程的上下文中运行。我们重写了run方法:

override fun run() = runWorker()

关于runWorker的实现我们在4.2.2.2节已经看了,这里就不细讲了。

最后是Scheduler终止,这里需要等到所有的任务已经完成且线程本身已经终止:

if (!_isTerminated.compareAndSet(false, true)) return

获取当前已经创建的Workers数量,确保即将处理的Worker不是当前正在执行的Worker,其他Worker会等待对应的任务执行完成之后将queue剩下的任务放到全局队列中:

fun shutdown(timeout: Long) {
    ...
    val currentWorker = currentWorker()
    val created = synchronized(workers) { createdWorkers }
    for (i in 1..created) {
        val worker = workers[i]!!
        if (worker !== currentWorker) {
            while (worker.getState() != Thread.State.TERMINATED) {
                LockSupport.unpark(worker)
                worker.join(timeout)
            }
            assert { worker.state === WorkerState.TERMINATED }
            worker.localQueue.offloadAllWorkTo(globalBlockingQueue)
        }
    }
    ...
}

在最后将当前Worker和全局队列中的尚未执行的任务全部执行:

globalBlockingQueue.close()
globalCpuQueue.close()
while (true) {
    val task = currentWorker?.findTask(true)
        ?: globalCpuQueue.removeFirstOrNull()
        ?: globalBlockingQueue.removeFirstOrNull()
        ?: break
    runSafely(task)
}

因此我们能知道在JVM环境下,如果使用Dispachter.Default派发任务的话,我们的任务执行是在非主线程,且Worker基于栈的形式接收任务。而真正的执行依赖Thread自身的调度(如果调用了dipatch方法会默认执行,并在任务执行完成之后将Worker Park)。

4.2.3 其他Native

现在来看看非Darwin平台的实现逻辑。

override fun dispatch(context: CoroutineContext, block: Runnable) {
    ctx.dispatch(context, block)
}

这里的ctx是通过调用newFixedThreadPoolContext函数创建的MultiWorkerDispatcher实例

private class MultiWorkerDispatcher(
    private val name: String,
    private val workersCount: Int
) : CloseableCoroutineDispatcher() {
    // 计数器,dispatch会增加该计数器,worker执行会消费该计数器
    private val tasksAndWorkersCounter = atomic(0L)
    private val tasksQueue = Channel<Runnable>(Channel.UNLIMITED)
    private val availableWorkers = Channel<CancellableContinuation<Runnable>>(Channel.UNLIMITED)
    private val workerPool = OnDemandAllocatingPool(workersCount) {
        Worker.start(name = "$name-$it").apply {
            executeAfter { workerRunLoop() }
        }
    }
    ...
}

我们看到声明了一个workerPool,可以将其理解为一个Worker的共享池,定义了可以持有的最大Worker数量,需要调用allocate来执行上面代码中start:

Worker.start(name = "$name-$it").apply {
    executeAfter { workerRunLoop() }
}

能看到还声明了tasksQueue和availableWorkers两个Channel,在创建taskQueue和availableWorkders时传入的参数都是UNLIMITED,所以他们都是BufferedChannel实例。其作用我们可以理解为外部环境(调用协程的环境)通过Channel将协程的任务传递给workerPool中的Worker来执行对应的任务。

派发任务是通过dispatch函数实现的:

override fun dispatch(context: CoroutineContext, block: Runnable) {
    // getAndUpdate是先获取值,然后更新值
    val state = tasksAndWorkersCounter.getAndUpdate {
        if (it.isClosed())
            throw IllegalStateException("Dispatcher $name was closed, attempted to schedule: $block")
        it + 2
    }
    if (state.hasWorkers()) {
        obtainWorker().resume(block)
    } else {
        workerPool.allocate()
        val result = tasksQueue.trySend(block)
        checkChannelResult(result)
    }
}

在方法最前面state的值为0,然后会将tasksAndWorkersCounter增加2,在最初始的情况下hasWorkers会返回false,因此会执行else分支调用allocate创建一个Worker实例,这里会立即开启worker。接着执行trySend往通道中发送数据,这里是待执行的block任务块。

我们在上面的Worker start看到其真正执行是在workerRunLoop中:

private fun workerRunLoop() = runBlocking {
    while (true) {
        val state = tasksAndWorkersCounter.getAndUpdate {
            if (it.isClosed() && !it.hasTasks()) return@runBlocking
            it - 2
        }
        if (state.hasTasks()) {
            tasksQueue.receive().run()
        } else {
            try {
                suspendCancellableCoroutine {
                    val result = availableWorkers.trySend(it)
                    checkChannelResult(result)
                }.run()
            } catch (e: CancellationException) {
                /** we are cancelled from [close] and thus will never get back to this branch of code,
                but there may still be pending work, so we can't just exit here. */
            }
        }
    }
}

由于我们在dispatch派发了任务,所以state为2,并将tasksAndWorkersCounter减2(这时该值为0)。因此会命中hasTasks判断,即tasksQueue通道会接收并执行block任务块。

上面的代码有一个while(true)的循环,也就是说任务执行之后Worker还会继续执行。上面我们说到此时tasksAndWorkersCounter为0(即state为0),不会命中下面的if条件:

private fun workerRunLoop() = runBlocking {
    ...
    val state = tasksAndWorkersCounter.getAndUpdate {
        if (it.isClosed() && !it.hasTasks()) return@runBlocking
        it - 2
    }
    ...
}

此时会继续执行减2的逻辑,这时tasksAndWorkersCounter值更新为-2。要命中hasTasks是需要counter的值大于2(state为0),这明显不满足。因此会执行:

suspendCancellableCoroutine {
    val result = availableWorkers.trySend(it)
    checkChannelResult(result)
}.run()

这里调用了一个挂起函数suspendCancellableCoroutine,由于内部只是做了trySend。因此整个while循环会等到Channel调用了resume之后才会重新执行。那为什么会挂起呢?

因为使用了workerRunLoop使用runBlocking起了一个会阻塞当前线程执行的协程。所以workerRunLoop会在协程环境中执行,suspend会将当前协程挂起。

我们回过头来继续看dispatch函数,当下一个协程任务块派发到了Default:

private fun obtainWorker(): CancellableContinuation<Runnable> = availableWorkers.tryReceive().getOrNull() ?: runBlocking { availableWorkers.receive() }
override fun dispatch(context: CoroutineContext, block: Runnable) {
    ...
    if (state.hasWorkers()) {
        // there are workers that have nothing to do, let's grab one of them
        /// 存在已经创建的空闲worker,直接调用resume使用该worker,不需要重新allocate
        obtainWorker().resume(block)
    }
    ...
}

hasWorkers需要tasksAndWorkersCounter小于0,现在条件满足(按照我们的分析顺序此时该值为-2)。接着调用availableWorkers的resume重启Channel并执行block任务块。这样做能够尽可能的减少Worker实例的创建。

整个过程就像是创建的协程任务按照Dispachter将其分发给不同的Worker去执行对应的任务,分发的方式就是通过管道Channel(send和recive)。

4.3 Main Dispachter实现

和Default一样,我们需要分析不同的平台的大致实现原理。

4.3.1 类Darwin Native

接着来看一下Main的实现,Main支持通过injectMain来进行注入。默认的实现是调用GCD中的dispatch_get_main_queue函数:

override fun dispatch(context: CoroutineContext, block: Runnable) {
    autoreleasepool {
        dispatch_async(dispatch_get_main_queue()) {
            block.run()
        }
    }
}

4.3.2 JVM

JVM下面首先会通过FAST_SERVICE_LOADER_ENABLED开关查看是否支持FastServiceLoader,如果支持会通过FastServiceLoader反射加载否则通过ServiceLoader。

在FastServiceLoader针对安卓是加载的AndroidDispatcherFactory

internal class AndroidDispatcherFactory : MainDispatcherFactory {
    override fun createDispatcher(allFactories: List<MainDispatcherFactory>): MainCoroutineDispatcher {
        val mainLooper = Looper.getMainLooper() ?: throw IllegalStateException("The main looper is not available")
        return HandlerContext(mainLooper.asHandler(async = true))
    }
    override fun hintOnError(): String = "For tests Dispatchers.setMain from kotlinx-coroutines-test module can be used"
    override val loadPriority: Int
        get() = Int.MAX_VALUE / 2
}

这里会获取主线程的Looper然后调用asHandler传递给HandlerContext构造函数,下面是asHandler的实现:

internal fun Looper.asHandler(async: Boolean): Handler {
     ...
    val constructor: Constructor<Handler>
    try {
        constructor = Handler::class.java.getDeclaredConstructor(Looper::class.java,
            Handler.Callback::class.java, Boolean::class.javaPrimitiveType)
    } catch (ignored: NoSuchMethodException) {
        // Hidden constructor absent. Fall back to non-async constructor.
        return Handler(this)
    }
    return constructor.newInstance(this, null, true)
}

4.3.3 其他Native

而Main Dispachter并没有给出具体实现,需要由外部进行注入:

private object MissingMainDispatcher : MainCoroutineDispatcher() {
    override val immediate: MainCoroutineDispatcher
        get() = notImplemented()
    override fun dispatch(context: CoroutineContext, block: Runnable) = notImplemented()
    override fun isDispatchNeeded(context: CoroutineContext): Boolean = notImplemented()
    override fun dispatchYield(context: CoroutineContext, block: Runnable) = notImplemented()
    private fun notImplemented(): Nothing = TODO("Dispatchers.Main is missing on the current platform")
}

外部注入是通过调用injectMain方法进行注入:

@PublishedApi
internal fun injectMain(dispatcher: MainCoroutineDispatcher) {
    injectedMainDispatcher = dispatcher
}

4.4 IO Dispathcer实现

对于IO的实现仅仅区分了是否是Native还是JVM的环境,因此先来看看Native是如何实现IO的派发的。

4.4.1 Native

Dispatcher.IO的定义如下:

public actual object Dispatchers {
    ...
    internal val IO: CoroutineDispatcher = DefaultIoScheduler
}
public actual val Dispatchers.IO: CoroutineDispatcher get() = IO

这里用的DefaultIoScheduler内部通过调用newFixedThreadPoolContext来生成共享线程池:

public actual fun newFixedThreadPoolContext(
    nThreads: Int,
    name: String
): CloseableCoroutineDispatcher {
    require(nThreads >= 1) { "Expected at least one thread, but got: $nThreads" }
    return MultiWorkerDispatcher(name, nThreads)
}

可以看到这个MultiWorkerDispatcher类我们4.2.3节已经仔细阅读过对应的源码了,这里就不再次说明了,总体来说是:

创建存在多个Worker的线程池,并且最大限度地将所有Worker利用起来的线程池。IO的任务会放到共享线程池中

其限制了最大的线程数为64:

private val io = unlimitedPool.limitedParallelism(64)

但是我们可以使用limitedParallelism修改最大的线程数量,比如:

val dbDispatcher = Dispatchers.IO.limitedParallelism(100)

4.4.2 JVM

在JVM环境下的IO声明如下:

@JvmStatic
public val IO: CoroutineDispatcher get() = DefaultIoScheduler
public actual val Dispatchers.IO: CoroutineDispatcher get() = Dispatchers.IO

在DefaultIoScheduler中通过UnlimitedIoScheduler来创建调度器:

internal object DefaultIoScheduler : ExecutorCoroutineDispatcher(), Executor {
    private val default = UnlimitedIoScheduler.limitedParallelism(
        systemProp(
            IO_PARALLELISM_PROPERTY_NAME,
            64.coerceAtLeast(AVAILABLE_PROCESSORS)
        )
    )
    ...
}

UnlimitedIoScheduler内通过DefaultScheduler来进行任务调度,经过层层调用最终我们在4.2.2节里面提到的SchedulerCoroutineDispatcher。

我们从IO的内部实现发现其和Default的实现十分类似,比较明显的差异是在实现的dispatch方法上面:

override fun dispatchYield(context: CoroutineContext, block: Runnable) {
    DefaultScheduler.dispatchWithContext(block, BlockingContext, true)
}

override fun dispatch(context: CoroutineContext, block: Runnable) {
    DefaultScheduler.dispatchWithContext(block, BlockingContext, false)
}

对于IO的派发传入的Context是BlockingContext,其定义如下:

internal const val NonBlockingContext: TaskContext = false
internal const val BlockingContext: TaskContext = true

在读取其是否为阻塞任务实现:

internal inline val Task.isBlocking get() = taskContext

五、后记

前面的内容大致上将kotlin协程特征的原理进行了解读,有助于大家更进一步理解kotlin协程(当然不看这些也能使用协程)。
但kotlin协程的内容远不止此,比如协程构建、协程作用域、Job、通道以及Flow等等内容没办法在一篇文章中深入细致的解读

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容