Kotlin学习之协程的协程上下文与调度器

协程总是运行在一些以 CoroutineContext 类型为代表的上下文中,它们被定义在了 Kotlin 的标准库里。
协程上下文是各种不同元素的集合。其中主元素是协程中的 Job, 我们在前面的文档中⻅过它以及它的
调度器,而本文将对它进行介绍。

调度器与线程

协程上下文包含一个 协程调度器 (参⻅ CoroutineDispatcher)它确定了哪些线程或与线程相对应的协程执行。协程调度器可以将协程限制在一个特定的线程执行,或将它分派到一个线程池,亦或是让它不受限地运行。
所有的协程构建器诸如 launch 和 async 接收一个可选的 CoroutineContext 参数,它可以被用来显式的为一个新协程或其它上下文元素指定一个调度器。
尝试下面的示例:

    fun main22() = runBlocking<Unit> {
        launch { // context of the parent, main runBlocking coroutine
            println("main runBlocking      : I'm working in thread ${Thread.currentThread().name}")
        }
        launch(Dispatchers.Unconfined) { // not confined -- will work with main thread
            println("Unconfined            : I'm working in thread ${Thread.currentThread().name}")
        }
        launch(Dispatchers.Default) { // will get dispatched to DefaultDispatcher
            println("Default               : I'm working in thread ${Thread.currentThread().name}")
        }
        launch(newSingleThreadContext("MyOwnThread")) { // will get its own new thread
            println("newSingleThreadContext: I'm working in thread ${Thread.currentThread().name}")
        }
    }

它执行后得到了如下输出(也许顺序会有所不同):

2020-05-14 16:00:06.778 32275-32275/com.yy.kotlindemo I/System.out: Unconfined            : I'm working in thread main
2020-05-14 16:00:06.792 32275-32275/com.yy.kotlindemo I/System.out: main runBlocking      : I'm working in thread main
2020-05-14 16:00:06.793 32275-32461/com.yy.kotlindemo I/System.out: Default               : I'm working in thread DefaultDispatcher-worker-2
2020-05-14 16:00:06.794 32275-32462/com.yy.kotlindemo I/System.out: newSingleThreadContext: I'm working in thread MyOwnThread
  • 当调用 launch { ...... } 时不传参数,它从启动了它的 CoroutineScope 中承袭了上下文(以及调度器)。在这个案例中,它从 main 线程中的 runBlocking 主协程承袭了上下文。
  • Dispatchers.Unconfined 是一个特殊的调度器且似乎也运行在 main 线程中,但实际上, 它是一种不同的机制,这会在后文中讲到。
  • 该默认调度器,当协程在 GlobalScope 中启动的时候使用, 它代表 Dispatchers.Default 使用了共享的后台线程池, 所以 GlobalScope.launch { ...... } 也可以使用相同的调度器⸺launch(Dispatchers.Default) { ...... } 。
  • newSingleThreadContext 为协程的运行启动了一个线程。 一个专用的线程是一种非常昂贵的资源。在真实的应用程序中两者都必须被释放,当不再需要的时候,使用 close 函数,或存储在一个顶层变量中使它在整个应用程序中被重用。

非受限调度器 vs 受限调度器

Dispatchers.Unconfined 协程调度器在调用它的线程启动了一个协程,但它仅仅只是运行到第一个挂起点。挂起后,它恢复线程中的协程,而这完全由被调用的挂起函数来决定。非受限的调度器非常适用于执行不消耗 CPU 时间的任务,以及不更新局限于特定线程的任何共享数据(如UI)的协程。
另一方面,该调度器默认继承了外部的 CoroutineScope。 runBlocking 协程的默认调度器,特别是, 当它被限制在了调用者线程时,继承自它将会有效地限制协程在该线程运行并且具有可预测的 FIFO 调度。

    fun main23() = runBlocking<Unit> {
        launch(Dispatchers.Unconfined) { // not confined -- will work with main thread
            println("Unconfined      : I'm working in thread ${Thread.currentThread().name}")
            delay(500)
            println("Unconfined      : After delay in thread ${Thread.currentThread().name}")
        }
        launch { // context of the parent, main runBlocking coroutine
            println("main runBlocking: I'm working in thread ${Thread.currentThread().name}")
            delay(1000)
            println("main runBlocking: After delay in thread ${Thread.currentThread().name}")
        }
    }

执行后的输出:

2020-05-14 16:18:49.141 1810-1810/com.yy.kotlindemo I/System.out: Unconfined      : I'm working in thread main
2020-05-14 16:18:49.153 1810-1810/com.yy.kotlindemo I/System.out: main runBlocking: I'm working in thread main
2020-05-14 16:18:49.653 1810-1908/com.yy.kotlindemo I/System.out: Unconfined      : After delay in thread kotlinx.coroutines.DefaultExecutor
2020-05-14 16:18:50.191 1810-1810/com.yy.kotlindemo I/System.out: main runBlocking: After delay in thread main

所以,该协程的上下文继承自 runBlocking {...} 协程并在 main 线程中运行,当 delay 函数调用的时候,非受限的那个协程在默认的执行者线程中恢复执行。
非受限的调度器是一种高级机制,可以在某些极端情况下提供帮助而不需要调度协程以便稍后执行或产生不希望的副作用, 因为某些操作必须立即在协程中执行。 非受限调度器不应该在通常的代码中使用。

调试协程与线程

协程可以在一个线程上挂起并在其它线程上恢复。 甚至一个单线程的调度器也是难以弄清楚协程在何时何地正在做什么事情。使用通常调试应用程序的方法是让线程在每一个日志文件的日志声明中打印线程的名字。这种特性在日志框架中是普遍受支持的。但是在使用协程时,单独的线程名称不会给出很多协程上下文信息,所以 kotlinx.coroutines 包含了调试工具来让它更简单。

    fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
    fun main24() = runBlocking<Unit> {
        val a = async {
            log("I'm computing a piece of the answer")
            6
        }
        val b = async {
            log("I'm computing another piece of the answer")
            7
        }
        log("The answer is ${a.await() * b.await()}")
    }

这里有三个协程,包括 runBlocking 内的主协程 (#1) , 以及计算延期的值的另外两个协程 a (#2)和 b (#3)。 它们都在 runBlocking 上下文中执行并且被限制在了主线程内。 这段代码的输出如下:

2020-05-14 16:42:00.011 3397-3397/com.yy.kotlindemo I/System.out: [main] I'm computing a piece of the answer
2020-05-14 16:42:00.012 3397-3397/com.yy.kotlindemo I/System.out: [main] I'm computing another piece of the answer
2020-05-14 16:42:00.012 3397-3397/com.yy.kotlindemo I/System.out: [main] The answer is 42

这个 log 函数在方括号种打印了线程的名字,并且你可以看到它是 main 线程,并且附带了当前正在其上执行的协程的标识符。这个标识符在调试模式开启时,将连续分配给所有创建的协程。

当 JVM 以 -ea 参数配置运行时,调试模式也会开启。 你可以在 DEBUG_PROPERTY_NAME 属性的文档中阅读有关调试工具的更多信息

在不同线程间跳转

使用 -Dkotlinx.coroutines.debug JVM 参数运行下面的代码

    fun main25() {
        newSingleThreadContext("Ctx1").use { ctx1 ->
            newSingleThreadContext("Ctx2").use { ctx2 ->
                runBlocking(ctx1) {
                    log("Started in ctx1")
                    withContext(ctx2) {
                        log("Working in ctx2")
                    }
                    log("Back to ctx1")
                }
            }
        }
    }

它演示了一些新技术。其中一个使用 runBlocking 来显式指定了一个上下文,并且另一个使用withContext 函数来改变协程的上下文,而仍然驻留在相同的协程中,正如可以在下面的输出中所⻅到的。

2020-05-14 18:15:53.193 11334-11500/com.yy.kotlindemo I/System.out: [Ctx1 @coroutine#1] Started in ctx1
2020-05-14 18:15:53.200 11334-11501/com.yy.kotlindemo I/System.out: [Ctx2 @coroutine#1] Working in ctx2
2020-05-14 18:15:53.201 11334-11500/com.yy.kotlindemo I/System.out: [Ctx1 @coroutine#1] Back to ctx1

注意,在这个例子中,当我们不再需要某个在 newSingleThreadContext 中创建的线程的时候, 它使用了 Kotlin 标准库中的 use 函数来释放该线程。

上下文中的作业

协程的 Job 是上下文的一部分,并且可以使用 coroutineContext [Job] 表达式在上下文中检索它:

println("My job is ${coroutineContext[Job]}")

在调试模式下,它将输出如下这些信息:

System.out: My job is "coroutine#1":BlockingCoroutine{Active}@ab34d69

请注意,CoroutineScope 中的 isActive 只是 coroutineContext[Job]?.isActive == true的一种方便的快捷方式。

子协程

当一个协程被其它协程在 CoroutineScope 中启动的时候, 它将通过CoroutineScope.coroutineContext 来承袭上下文,并且这个新协程的 Job 将会成为父协程作业的 子作业。当一个父协程被取消的时候,所有它的子协程也会被递归的取消。
然而,当使用 GlobalScope 来启动一个协程时,则新协程的作业没有父作业。 因此它与这个启动的作用域无关且独立运作。

fun main27() = runBlocking<Unit> {
        // launch a coroutine to process some kind of incoming request
        val request = launch {
            // it spawns two other jobs, one with GlobalScope
            GlobalScope.launch {
                println("job1: I run in GlobalScope and execute independently!")
                delay(1000)
                println("job1: I am not affected by cancellation of the request")
            }
            // and the other inherits the parent context
            launch {
                delay(100)
                println("job2: I am a child of the request coroutine")
                delay(1000)
                println("job2: I will not execute this line if my parent request is cancelled")
            }
        }
        delay(500)
        request.cancel() // cancel processing of the request
        delay(1000) // delay a second to see what happens
        println("main: Who has survived request cancellation?")
    }

这段代码的输出如下,

I/System.out: job1: I run in GlobalScope and execute independently!
I/System.out: job2: I am a child of the request coroutine
I/System.out: job1: I am not affected by cancellation of the request
I/System.out: main: Who has survived request cancellation?

父协程的职责

一个父协程总是等待所有的子协程执行结束。父协程并不显式的跟踪所有子协程的启动,并且不必使用Job.join 在最后的时候等待它们:

   // 启动一个协程来处理某种传入请求(request)
    fun main28() = runBlocking<Unit> {
        // launch a coroutine to process some kind of incoming request
        val request = launch {
            repeat(3) { i -> // // 启动少量的子作业
                launch  {
                    delay((i + 1) * 200L) // 延迟 200 毫秒、400 毫秒、600 毫秒的时间
                    println("Coroutine $i is done")
                }
            }
            println("request: I'm done and I don't explicitly join my children that are still active")
        }
        request.join() // 等待请求的完成,包括其所有子协程
        println("Now processing of the request is complete")
    }

结果如下所示:

/System.out: Coroutine 0 is done
/System.out: Coroutine 1 is done
/System.out: Coroutine 2 is done
/System.out: Now processing of the request is complete

命名协程以用于调试

当协程经常打印日志并且你只需要关联来自同一个协程的日志记录时, 则自动分配的 id 是非常好的。然而,当一个协程与特定请求的处理相关联时或做一些特定的后台任务,最好将其明确命名以用于调试目的。 CoroutineName 上下文元素与线程名具有相同的目的。当调试模式开启时,它被包含在正在执行此协程的线程名中。
下面的例子演示了这一概念:

    fun main29() = runBlocking(CoroutineName("main")) {
        log("Started main coroutine")
        // run two background value computations
        val v1 = async(CoroutineName("v1coroutine")) {
            delay(500)
            log("Computing v1")
            252
        }
        val v2 = async(CoroutineName("v2coroutine")) {
            delay(1000)
            log("Computing v2")
            6
        }
        log("The answer for v1 / v2 = ${v1.await() / v2.await()}")
    }

程序执行使用了 -Dkotlinx.coroutines.debug JVM 参数,输出如下所示:

I/System.out: [main @main#1] Started main coroutine
I/System.out: [main @v1coroutine#2] Computing v1
I/System.out: [main @v2coroutine#3] Computing v2
I/System.out: [main @main#1] The answer for v1 / v2 = 42

组合上下文中的元素

有时我们需要在协程上下文中定义多个元素。我们可以使用 + 操作符来实现。 比如说,我们可以显式指定一个调度器来启动协程并且同时显式指定一个命名:

   fun main30() = runBlocking<Unit> {
        launch(Dispatchers.Default + CoroutineName("test")) {
            println("I'm working in thread ${Thread.currentThread().name}")
        }
    }

这段代码使用了 -Dkotlinx.coroutines.debug JVM 参数,输出如下所示:

I/System.out: I'm working in thread DefaultDispatcher-worker-1 @test#2

协程作用域

让我们将关于上下文,子协程以及作业的知识综合在一起。假设我们的应用程序拥有一个具有生命周期的对象,但这个对象并不是一个协程。举例来说,我们编写了一个 Android 应用程序并在 Android 的activity 上下文中启动了一组协程来使用异步操作拉取并更新数据以及执行动画等等。所有这些协程必须在这个 activity 销毁的时候取消以避免内存泄漏。当然,我们也可以手动操作上下文与作业,以结合activity 的生命周期与它的协程,但是 kotlinx.coroutines 提供了一个封装:CoroutineScope 的抽象。 你应该已经熟悉了协程作用域,因为所有的协程构建器都声明为在它之上的扩展。
我们通过创建一个 CoroutineScope 实例来管理协程的生命周期,并使它与 activit 的生命周期相关联。CoroutineScope 可以通过 CoroutineScope() 创建或者通过MainScope() 工厂函数。前者创建了一个通用作用域,而后者为使用 Dispatchers.Main 作为默认调度器的 UI 应用程序 创建作用域:

class Activity {
         private val mainScope = MainScope()
         fun destroy() {
            mainScope.cancel()
         }
        // 继续运行...

或者,我们可以在这个 Activity 类中实现 CoroutineScope 接口。最好的方法是使用具有默认工厂函数的委托。 我们也可以将所需的调度器与作用域合并(我们在这个示例中使用Dispatchers.Default)。

class Activity : CoroutineScope by CoroutineScope(Dispatchers.Default) {
// 继续运行......

现在,在这个 Activity 的作用域中启动协程,且没有明确指定它们的上下文。在示例中,我们启动了十个协程并延迟不同的时间:

    // 在 Activity 类中
    fun doSomething() {
// 在示例中启动了 10 个协程,且每个都工作了不同的时⻓
        repeat(10) { i ->
            launch {
                delay((i + 1) * 200L) // 延迟 200 毫秒、400 毫秒、600 毫秒等等不同的时间
                println("Coroutine $i is done")
            }
        }
    }
} // Activity 类结束

在 main 函数中我们创建 activity,调用测试函数 doSomething ,并且在 500 毫秒后销毁这个activity。 这取消了从 doSomething 启动的所有协程。我们可以观察到这些是由于在销毁之后, 即使我们再等一会儿,activity 也不再打印消息。

import kotlinx.coroutines.*

class Activity {
    private val mainScope = CoroutineScope(Dispatchers.Default) // use Default for test purposes
    
    fun destroy() {
        mainScope.cancel()
    }

    fun doSomething() {
        // launch ten coroutines for a demo, each working for a different time
        repeat(10) { i ->
            mainScope.launch {
                delay((i + 1) * 200L) // variable delay 200ms, 400ms, ... etc
                println("Coroutine $i is done")
            }
        }
    }
} // class Activity ends

fun main() = runBlocking<Unit> {
    val activity = Activity()
    activity.doSomething() // run test function
    println("Launched coroutines")
    delay(500L) // delay for half a second
    println("Destroying activity!")
    activity.destroy() // cancels all coroutines
    delay(1000) // visually confirm that they don't work
}

这个示例的输出如下所示:

2020-05-15 14:33:48.635 30433-30433/com.yy.kotlindemo I/System.out: Launched coroutines
2020-05-15 14:33:48.867 30433-30521/com.yy.kotlindemo I/System.out: Coroutine 0 is done
2020-05-15 14:33:49.082 30433-30521/com.yy.kotlindemo I/System.out: Coroutine 1 is done
2020-05-15 14:33:49.181 30433-30433/com.yy.kotlindemo I/System.out: Destroying activity!

你可以看到,只有前两个协程打印了消息,而另一个协程在 Activity.destroy() 中单次调用了job.cancel() 。

线程局部数据

有时,能够将一些线程局部数据传递到协程与协程之间是很方便的。 然而,由于它们不受任何特定线程的约束,如果手动完成,可能会导致出现样板代码。
ThreadLocal, asContextElement 扩展函数在这里会充当救兵。它创建了额外的上下文元素, 且保留给定 ThreadLocal 的值,并在每次协程切换其上下文时恢复它。它很容易在下面的代码中演示:

    fun main32() = runBlocking<Unit> {
        threadLocal.set("main")
        println("Pre-main, current thread: ${Thread.currentThread()}, thread local value: '${threadLocal.get()}'")
        val job = launch(Dispatchers.Default + threadLocal.asContextElement(value = "launch")) {
            println("Launch start, current thread: ${Thread.currentThread()}, thread local value: '${threadLocal.get()}'")
            yield()
            println("After yield, current thread: ${Thread.currentThread()}, thread local value: '${threadLocal.get()}'")
        }
        job.join()
        println("Post-main, current thread: ${Thread.currentThread()}, thread local value: '${threadLocal.get()}'")
    }

在这个例子中我们使用 Dispatchers.Default 在后台线程池中启动了一个新的协程,所以它工作在线程池中的不同线程中,但它仍然具有线程局部变量的值, 我们指定使用threadLocal.asContextElement(value = "launch") , 无论协程执行在什么线程中都是没有问题的。 因此,其输出如(调试)所示:

I/System.out: Pre-main, current thread: Thread[main @coroutine#1,5,main], thread local value: 'main'
I/System.out: Launch start, current thread: Thread[DefaultDispatcher-worker-1 @coroutine#2,5,main], thread local value: 'launch'
I/System.out: After yield, current thread: Thread[DefaultDispatcher-worker-1 @coroutine#2,5,main], thread local value: 'launch'
I/System.out: Post-main, current thread: Thread[main @coroutine#1,5,main], thread local value: 'main'

这很容易忘记去设置相应的上下文元素。如果运行协程的线程不同, 在协程中访问的线程局部变量则可能会产生意外的值。 为了避免这种情况,建议使用 ensurePresent 方法并且在不正确的使用时快速失败。
ThreadLocal 具有一流的支持,可以与任何 kotlinx.coroutines 提供的原语一起使用。 但它有一个关键限制,即:当一个线程局部变量变化时,则这个新值不会传播给协程调用者(因为上下文元素无法追踪所有 ThreadLocal 对象访问)
,并且下次挂起时更新的值将丢失。 使用 withContext 在协程中更新线程局部变量,详⻅ asContextElement。
另外,一个值可以存储在一个可变的域中,例如 class Counter(var i: Int) ,是的,反过来, 可以存储在线程局部的变量中。然而,在这个案例中你完全有责任来进行同步可能的对这个可变的域进行的并发的修改。
对于高级的使用,例如,那些在内部使用线程局部传递数据的用于与日志记录 MDC 集成,以及事务上下文或任何其它库,请参⻅需要实现的 ThreadContextElement 接口的文档。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,125评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,293评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,054评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,077评论 1 291
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,096评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,062评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,988评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,817评论 0 273
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,266评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,486评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,646评论 1 347
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,375评论 5 342
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,974评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,621评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,796评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,642评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,538评论 2 352