【kotlin】- 携程的执行流程

简介

这篇文章将从源码的角度,分析携程的执行流程,我们创建一个携程,系统是怎么进行调度的,什么时候执行的,是否需要创建新线程等等,带着这些疑问,一起往下看吧。

例子先行

fun main(): Unit = runBlocking {
    launch {
        println("${treadName()}======1")
    }
    GlobalScope.launch {
        println("${treadName()}======3")
    }
    launch {
        println("${treadName()}======2")
    }
    println("${treadName()}======4")
    Thread.sleep(2000)
}

输出如下:

DefaultDispatcher-worker-1======3
main======4
main======1
main======2

Process finished with exit code 0

根据打印,如果根据单线程执行流程来看,是不是感觉上面的日志打印顺序有点不好理解,下面我们就逐步来进行分解。

  • runBlocking携程体
    这里将其它代码省略到了,我这里都是按照一条简单的执行流程进行讲解。

    public fun <T> runBlocking(context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.() -> T): T {
    
        val eventLoop: EventLoop?
        val newContext: CoroutineContext
        ...
        if (contextInterceptor == null) {
            eventLoop = ThreadLocalEventLoop.eventLoop
            newContext = GlobalScope.newCoroutineContext(context + eventLoop)
        }
        ...
        val coroutine = BlockingCoroutine<T>(newContext, currentThread, eventLoop)
        coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
        return coroutine.joinBlocking()
    }
    

    看一下eventLoop的初始化,会 在当前线程(主线程)创建BlockingEventLoop对象。

    internal val eventLoop: EventLoop
          get() = ref.get() ?: createEventLoop().also { ref.set(it) }
    
    internal actual fun createEventLoop(): EventLoop = BlockingEventLoop(Thread.currentThread())
    

    看一下newContext初始化,这里会对携程上下文进行组合,返回新的上下文。最后返回的是一个BlockingEventLoop对象。

    public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext): CoroutineContext {
       val combined = coroutineContext + context
        val debug = if (DEBUG) combined + CoroutineId(COROUTINE_ID.incrementAndGet()) else combined
        return if (combined !== Dispatchers.Default && combined[ContinuationInterceptor] == null)
          debug + Dispatchers.Default else debug
    }
    

    开始对携程进行调度

     coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
    

    看一下执行这句代码之前,各变量的值

    111.png

    而上面的代码最终调用的是CoroutineStart.DEFAULTinvoke方法。

      public operator fun <T> invoke(block: suspend () -> T, completion: Continuation<T>): Unit =
          when (this) {
              DEFAULT -> block.startCoroutineCancellable(completion)
              ATOMIC -> block.startCoroutine(completion)
              UNDISPATCHED -> block.startCoroutineUndispatched(completion)
              LAZY -> Unit // will start lazily
          }
    

    我们使用的是DEFAULT启动模式。然后会执行resumeCancellableWith方法。

      inline fun resumeCancellableWith(
          result: Result<T>,
          noinline onCancellation: ((cause: Throwable) -> Unit)?
      ) {
          val state = result.toState(onCancellation)
          if (dispatcher.isDispatchNeeded(context)) {
              _state = state
              resumeMode = MODE_CANCELLABLE
              dispatcher.dispatch(context, this)
          } else {
              executeUnconfined(state, MODE_CANCELLABLE) {
                  if (!resumeCancelled(state)) {
                      resumeUndispatchedWith(result)
                  }
              }
          }
      }
    

    dispatcherBlockingEventLoop对象,没有重写isDispatchNeeded,默认返回true。然后调用dispatch继续进行分发。BlockingEventLoop继承了EventLoopImplBase并调用其dispatch方法。把任务加入到队列中。

    public final override fun dispatch(context: CoroutineContext, block: Runnable) = enqueue(block)
    

    回到最开始,在coroutine.start(CoroutineStart.DEFAULT, coroutine, block)执行完,还执行了coroutine.joinBlocking()看一下实现。

        fun joinBlocking(): T {
          registerTimeLoopThread()
          try {
              eventLoop?.incrementUseCount()
              try {
                  while (true) {
                      @Suppress("DEPRECATION")
                      if (Thread.interrupted()) throw InterruptedException().also { cancelCoroutine(it) }
                      val parkNanos = eventLoop?.processNextEvent() ?: Long.MAX_VALUE
                      // note: process next even may loose unpark flag, so check if completed before parking
                      if (isCompleted) break
                      parkNanos(this, parkNanos)
                  }
              } finally { // paranoia
                  eventLoop?.decrementUseCount()
              }
          } finally { // paranoia
              unregisterTimeLoopThread()
          }
          // now return result
          val state = this.state.unboxState()
          (state as? CompletedExceptionally)?.let { throw it.cause }
          return state as T
      }
    

    执行val parkNanos = eventLoop?.processNextEvent() ?: Long.MAX_VALUE,取出任务进行执行,也就是runBlocking携程体。

  • launch {} 执行流程

    public fun CoroutineScope.launch(
        context: CoroutineContext = EmptyCoroutineContext,
        start: CoroutineStart = CoroutineStart.DEFAULT,
        block: suspend CoroutineScope.() -> Unit
    ): Job {
        val newContext = newCoroutineContext(context)
        val coroutine = if (start.isLazy)
            LazyStandaloneCoroutine(newContext, block) else
            StandaloneCoroutine(newContext, active = true)
        coroutine.start(start, coroutine, block)
        return coroutine
    }
    

    因为launch是直接在runBlocking(父携程体)里新的创建的子携程体,所以执行流程上和之前将的差不多,只不过不会像runBlocking再去创建BlockingEventLoop对象,而是直接用runBlocking(父携程体)的,然后把任务加到里面,所以通过这种方式其实就是单线程对任务的调度而已。所以在runBlocking(父携程体)内通过launch启动再多的携程体,其实都是在同一线程,按照任务队列的顺序执行的。

根据上面日志输出,并没有先执行两个launch携程体,这是为什么呢,根据上面的讲解,应用知道,runBlocking(父携程体)是第一被添加的队列的任务,其次是launch,所以是这样的顺序。那可以让launch立即执行吗?答案是可以的,这就要说携程的启动模式了。

  • CoroutineStart 是协程的启动模式,存在以下4种模式:

    1. DEFAULT 立即调度,可以在执行前被取消
    2. LAZY 需要时才启动,需要start、join等函数触发才可进行调度
    3. ATOMIC 立即调度,协程肯定会执行,执行前不可以被取消
    4. UNDISPATCHED 立即在当前线程执行,直到遇到第一个挂起点(可能切线程)

    我们使用UNDISPATCHED就可以使携程体马上在当前线程执行。看一下是怎么实现的。看一下实现:

使用这种启动模式执行UNDISPATCHED -> block.startCoroutineUndispatched(completion)方法。

internal fun <T> (suspend () -> T).startCoroutineUndispatched(completion: Continuation<T>) {
    startDirect(completion) { actualCompletion ->
        withCoroutineContext(completion.context, null) {
            startCoroutineUninterceptedOrReturn(actualCompletion)
        }
    }
}

大家可以自己点击去看一下,大概就是会立即执行携程体,而不是将任务放入队列。

但是GlobalScope.launch却不是按照这样的逻辑,这是因为GlobalScope.launch启动的全局携程,是一个独立的携程体了,并不是runBlocking(父携程体)子携程。看一下通过GlobalScope.launch有什么不同。

  • GlobalScope.launch执行流程
    1. 启动全局携程
    GlobalScope.launch
    
    newCoroutineContext(context)返回Dispatchers.Default对象。而DefaultScheduler继承了ExperimentalCoroutineDispatcher类。看一下ExperimentalCoroutineDispatcher中的dispatch代码:
     override fun dispatch(context: CoroutineContext, block: Runnable): Unit =
         ...
             coroutineScheduler.dispatch(block)
         ...
    
    看一下coroutineScheduler初始化
    private fun createScheduler() = CoroutineScheduler(corePoolSize, maxPoolSize, idleWorkerKeepAliveNs, schedulerName)
    
    CoroutineScheduler实现了Executor接口,里面还有两个全局队列和线程池相关的参数。
    @JvmField
    val globalCpuQueue = GlobalQueue()
    @JvmField
    val globalBlockingQueue = GlobalQueue()
    
    继续调用CoroutineScheduler中的dispatch方法
      fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, tailDispatch: Boolean = false) {
          trackTask() // this is needed for virtual time support
          val task = createTask(block, taskContext)
          // try to submit the task to the local queue and act depending on the result
          val currentWorker = currentWorker()
          val notAdded = currentWorker.submitToLocalQueue(task, tailDispatch)
          if (notAdded != null) {
              if (!addToGlobalQueue(notAdded)) {
                  // Global queue is closed in the last step of close/shutdown -- no more tasks should be accepted
                  throw RejectedExecutionException("$schedulerName was terminated")
              }
          }
          val skipUnpark = tailDispatch && currentWorker != null
          // Checking 'task' instead of 'notAdded' is completely okay
          if (task.mode == TASK_NON_BLOCKING) {
              if (skipUnpark) return
              signalCpuWork()
          } else {
              // Increment blocking tasks anyway
              signalBlockingWork(skipUnpark = skipUnpark)
          }
      }
    
    1. val task = createTask(block, taskContext)包装成TaskImpl对象。
    2. val currentWorker = currentWorker()当前是主线程,运行程序时由进程创建,肯定不是Worker对象,Worker是一个继承了Thread的类 ,并且在初始化时都指定为守护线程
      Worker存在5种状态:
      CPU_ACQUIRED 获取到cpu权限
      BLOCKING 正在执行IO阻塞任务
      PARKING 已处理完所有任务,线程挂起
      DORMANT 初始态
      TERMINATED 终止态
      
  1. val notAdded = currentWorker.submitToLocalQueue(task, tailDispatch)由于currentWorker是null,直接返回task对象。
  2. addToGlobalQueue(notAdded)根据任务是否是阻塞任务,将task添加到全局任务队列中。这里被添加到globalCpuQueue中。
  3. 执行signalCpuWork()来唤醒一个线程或者启动一个新的线程。
    fun signalCpuWork() {
      if (tryUnpark()) return
      if (tryCreateWorker()) return
      tryUnpark()
  }
 private fun tryCreateWorker(state: Long = controlState.value): Boolean {  
     val created = createdWorkers(state)// 创建的的线程总数  
     val blocking = blockingTasks(state)// 处理阻塞任务的线程数量  
     val cpuWorkers = (created - blocking).coerceAtLeast(0)//得到非阻塞任务的线程数量  
     if (cpuWorkers < corePoolSize) {// 小于核心线程数量,进行线程的创建  
         val newCpuWorkers = createNewWorker()  
         if (newCpuWorkers == 1 && corePoolSize > 1) createNewWorker()// 当前非阻塞型线程数量为1,同时核心线程数量大于1时,再进行一个线程的创建,  
         if (newCpuWorkers > 0) return true  
     }  
     return false  
 }  
   
  // 创建线程  
  private fun createNewWorker(): Int {  
     synchronized(workers) {  
         ...  
         val created = createdWorkers(state)// 创建的的线程总数  
         val blocking = blockingTasks(state)// 阻塞的线程数量  
         val cpuWorkers = (created - blocking).coerceAtLeast(0) // 得到非阻塞线程数量  
         if (cpuWorkers >= corePoolSize) return 0//超过最大核心线程数,不能进行新线程创建  
        if (created >= maxPoolSize) return 0// 超过最大线程数限制,不能进行新线程创建  
         ...  
         val worker = Worker(newIndex)  
         workers[newIndex] = worker  
         require(newIndex == incrementCreatedWorkers())  
         worker.start()// 线程启动  
         return cpuWorkers + 1  
     }  
 }

那么这里面的任务又是怎么调度的呢,当全局任务被执行的时候,看一下Worker中的run方法:

 override fun run() = runWorker()

执行runWorker方法,该方法会从队列中找到执行任务,然后开始执行。详细代码,可以自行翻阅。

所以GlobalScope.launch使用的就是线程池,没有所谓的性能好。

  • Dispatchers调度器
    Dispatchers是协程中提供的线程调度器,用来切换线程,指定协程所运行的线程。,上面用的是默认调度器Dispatchers.Default

Dispatchers中提供了4种类型调度器:
Default 默认调度器:适合CPU密集型任务调度器 比如逻辑计算;
Main UI调度器
Unconfined 无限制调度器:对协程执行的线程不做限制,协程恢复时可以在任意线程;
IO调度器:适合IO密集型任务调度器 比如读写文件,网络请求等。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,039评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,223评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,916评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,009评论 1 291
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,030评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,011评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,934评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,754评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,202评论 1 309
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,433评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,590评论 1 346
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,321评论 5 342
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,917评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,568评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,738评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,583评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,482评论 2 352

推荐阅读更多精彩内容

  • 为什么要搞出和用协程呢 是节省CPU,避免系统内核级的线程频繁切换,造成的CPU资源浪费。好钢用在刀刃上。而协程是...
    静默的小猫阅读 640评论 0 2
  • kotlin 1.3出来了,而协程(coroutines)也正式发布稳定版。虽然目前项目不是kotlin语言,但为...
    小小的coder阅读 951评论 0 0
  • [TOC] 简介 Coroutines are computer program components that ...
    Whyn阅读 5,914评论 5 15
  • 协程(Coroutine) 协程引入 异步加载图片 普通代码:val view = ...loadImageAsy...
    晨起清风阅读 1,273评论 0 1
  • 什么是协程? 官方描述:协程通过将复杂性放入库来简化异步编程。程序的逻辑可以在协程中顺序地表达,而底层库会为我们解...
    pureChild阅读 842评论 0 1