Kotlin协程源码分析(一)

参考资料

搞个例外,先推荐几篇很好的资料,不然理解会比较困难,不从基础讲起了。

Kotlin Coroutine 原理解析

Kotlin Coroutines(协程) 完全解析(二),深入理解协程的挂起、恢复与调度

深入理解 Kotlin Coroutine (一)

CoroutineContext

协程的上下文。可以通过context[CoroutineContext.Element]拿到当前context关联的任意element,后面要看到的拦截器也是靠interface ContinuationInterceptor : CoroutineContext.Element与上下文关联起来的

plus

对于context,我们会经常看到:

Dispatchers.IO + job

@NotNull
  public final CoroutineContext invoke(@NotNull CoroutineContext paramCoroutineContext, @NotNull CoroutineContext.Element paramElement)
  {
    Intrinsics.checkParameterIsNotNull(paramCoroutineContext, "acc");
    Intrinsics.checkParameterIsNotNull(paramElement, "element");
    CoroutineContext localCoroutineContext1 = paramCoroutineContext.minusKey(paramElement.getKey());
    if (localCoroutineContext1 == EmptyCoroutineContext.INSTANCE) {
      return (CoroutineContext)paramElement;
    }
    ContinuationInterceptor localContinuationInterceptor = (ContinuationInterceptor)localCoroutineContext1.get((CoroutineContext.Key)ContinuationInterceptor.Key);
    CombinedContext localCombinedContext;
    if (localContinuationInterceptor == null)
    {
      localCombinedContext = new CombinedContext(localCoroutineContext1, paramElement);
    }
    else
    {
      CoroutineContext localCoroutineContext2 = localCoroutineContext1.minusKey((CoroutineContext.Key)ContinuationInterceptor.Key);
      if (localCoroutineContext2 == EmptyCoroutineContext.INSTANCE) {
        localCombinedContext = new CombinedContext((CoroutineContext)paramElement, (CoroutineContext.Element)localContinuationInterceptor);
      } else {
        localCombinedContext = new CombinedContext((CoroutineContext)new CombinedContext(localCoroutineContext2, paramElement), (CoroutineContext.Element)localContinuationInterceptor);
      }
    }
    return (CoroutineContext)localCombinedContext;
  }

Continuation

BaseContinuationImpl

传入一个completion作为协程结束后的操作行为。

BaseContinuationImpl.png
resumeWith 精华部分

首先自己invokeSuspend(paramObject)

如果返回IntrinsicsKt.getCOROUTINE_SUSPENDED()说明需要暂停

public final void resumeWith(@NotNull Object paramObject)
  {
    DebugProbesKt.probeCoroutineResumed((Continuation)this);
    BaseContinuationImpl localBaseContinuationImpl1 = (BaseContinuationImpl)this;
    Continuation localContinuation;
    Object localObject2;
    for (Object localObject1 = paramObject;; localObject1 = localObject2)
    {
      BaseContinuationImpl localBaseContinuationImpl2 = localBaseContinuationImpl1;
      localContinuation = localBaseContinuationImpl2.completion;
      if (localContinuation == null) {
        Intrinsics.throwNpe();
      }
      try
      {
        Object localObject3 = localBaseContinuationImpl2.invokeSuspend(localObject1);
        if (localObject3 == IntrinsicsKt.getCOROUTINE_SUSPENDED()) {
          return;
        }
        localObject2 = Result.constructor-impl(localObject3);
      }
      catch (Throwable localThrowable)
      {
        localObject2 = Result.constructor-impl(ResultKt.createFailure(localThrowable));
      }
      localBaseContinuationImpl2.releaseIntercepted();
      if (!(localContinuation instanceof BaseContinuationImpl)) {
        break;
      }
      localBaseContinuationImpl1 = (BaseContinuationImpl)localContinuation;
    }
    localContinuation.resumeWith(localObject2);
  }

ContinuationImpl

base的基础上加上intercepted()

SafeContinuation

suspend时会在外面套一层safeContinuation,判断当前是继续执行还是resume

public SafeContinuation(@NotNull Continuation<? super T> paramContinuation)
  {
    this(paramContinuation, CoroutineSingletons.UNDECIDED);
  }

resumeWith
 public void resumeWith(@NotNull Object paramObject)
  {
    for (;;)
    {
      Object localObject = this.result;
      if (localObject == CoroutineSingletons.UNDECIDED)
      {
        if (!RESULT.compareAndSet(this, CoroutineSingletons.UNDECIDED, paramObject)) {}
      }
      else
      {
        if (localObject != IntrinsicsKt.getCOROUTINE_SUSPENDED()) {
          break;
        }
        if (RESULT.compareAndSet(this, IntrinsicsKt.getCOROUTINE_SUSPENDED(), CoroutineSingletons.RESUMED))
        {
          this.delegate.resumeWith(paramObject);
          return;
        }
      }
    }
    throw ((Throwable)new IllegalStateException("Already resumed"));
  }

如果当前continuation要继续时,状态可以从:

  • CoroutineSingletons.UNDECIDED -> paramObject

如果之前协程处于suspend状态,那就转成CoroutineSingletons.RESUMED状态,同时this.delegate.resumeWith(paramObject)

getOrThow
@Nullable
 public final Object getOrThrow()
 {
   Object localObject = this.result;
   if (localObject == CoroutineSingletons.UNDECIDED)
   {
     if (RESULT.compareAndSet(this, CoroutineSingletons.UNDECIDED, IntrinsicsKt.getCOROUTINE_SUSPENDED())) {
       return IntrinsicsKt.getCOROUTINE_SUSPENDED();
     }
     localObject = this.result;
   }
   if (localObject == CoroutineSingletons.RESUMED) {
     return IntrinsicsKt.getCOROUTINE_SUSPENDED();
   }
   if ((localObject instanceof Result.Failure)) {
     throw ((Result.Failure)localObject).exception;
   }
   return localObject;
 }

如果想要拿到结果时,当前result仍为CoroutineSingletons.UNDECIDED,那就说明当前协程需要暂停,状态转为IntrinsicsKt.getCOROUTINE_SUSPENDED()

CombinedContext

经常会看到:

context get() = job() + IO....

这里+号就重载成了CombinedContext

IntrinsicsKt__IntrinsicsJvmKt

createCoroutineUnintercepted

直接看简单点的吧:

if ((paramFunction2 instanceof BaseContinuationImpl)) {
    return ((BaseContinuationImpl)paramFunction2).create(paramR, localContinuation);
  }

private static final <T> Object startCoroutineUninterceptedOrReturn(@NotNull Function1<? super Continuation<? super T>, ? extends Object> paramFunction1, Continuation<? super T> paramContinuation)
  {
    if (paramFunction1 == null) {
      throw new TypeCastException("null cannot be cast to non-null type (kotlin.coroutines.Continuation<T>) -> kotlin.Any?");
    }
    return ((Function1)TypeIntrinsics.beforeCheckcastToFunctionOfArity(paramFunction1, 1)).invoke(paramContinuation);
  }

其实就是function.invoke()..

具体例子

fun main(args: Array<String>) {
       log("before coroutine")
       //启动我们的协程
       asyncCalcMd5("test.zip") {
           log("in coroutine. Before suspend.")
           //暂停我们的线程,并开始执行一段耗时操作
           val result: String = suspendCoroutine {
                   continuation ->
               log("in suspend block.")
               continuation.resume(calcMd5(continuation.context[FilePath]!!.path))
               log("after resume.")
           }
           log("in coroutine. After suspend. result = $result")
       }
       log("after coroutine")
   }

   /**
    * 上下文,用来存放我们需要的信息,可以灵活的自定义
    */
   class FilePath(val path: String): AbstractCoroutineContextElement(FilePath){
       companion object Key : CoroutineContext.Key<FilePath>
   }

   fun asyncCalcMd5(path: String, block: suspend () -> Unit) {
       val continuation = object : Continuation<Unit> {
           override fun resumeWith(result: Result<Unit>) {
               log("resume: $result")
           }

           override val context: CoroutineContext
               get() = FilePath(path)

       }
       block.startCoroutine(continuation)
   }

   fun calcMd5(path: String): String{
       log("calc md5 for $path.")
       //暂时用这个模拟耗时
       Thread.sleep(1000)
       //假设这就是我们计算得到的 MD5 值
       return System.currentTimeMillis().toString()
   }

这里block被转成:

传入的continuation即为block.startCoroutine(continuation),startCoroutine会被编译器转化为startCoroutine(Function, Continutation),这里

  • Function: block
  • Continuation: val continutaion

startCoroutine

 @SinceKotlin(version="1.3")
public static final <R, T> void startCoroutine(@NotNull Function2<? super R, ? super Continuation<? super T>, ? extends Object> paramFunction2, R paramR, @NotNull Continuation<? super T> paramContinuation)
{
  Intrinsics.checkParameterIsNotNull(paramFunction2, "receiver$0");
  Intrinsics.checkParameterIsNotNull(paramContinuation, "completion");
  Continuation localContinuation = IntrinsicsKt.intercepted(IntrinsicsKt.createCoroutineUnintercepted(paramFunction2, paramR, paramContinuation));
  Unit localUnit = Unit.INSTANCE;
  localContinuation.resumeWith(Result.constructor-impl(localUnit));
}

先创建,再resume

创建分两步:

IntrinsicsKt.createCoroutineUnintercepted(paramFunction2, paramR, paramContinuation)

if ((paramFunction1 instanceof BaseContinuationImpl)) {
  return ((BaseContinuationImpl)paramFunction1).create(localContinuation);
}

IntrinsicsKt.intercepted

那我们俩看block转换成了什么样子的paramFunction:


final class block extends SuspendLambda
  implements Function1<Continuation<? super Unit>, Object>{
      @NotNull
  public final Continuation<Unit> create(@NotNull Continuation<?> paramContinuation)
  {
    //create出baseCoroutineImpl
    Intrinsics.checkParameterIsNotNull(paramContinuation, "completion");
    return new 1(this.this$0, paramContinuation);
  }

  public final Object invoke(Object paramObject)
  {
    return ((1)create((Continuation)paramObject)).invokeSuspend(Unit.INSTANCE);
  }

  //resumeWith以后,先走invokeSuspend
  @Nullable
  public final Object invokeSuspend(@NotNull Object paramObject)
  {
    Object localObject1 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
    Object localObject2;
    switch (this.label)
    {
    default: 
      throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
    case 1: 
      ((1)this.L$0);
      if (!(paramObject instanceof Result.Failure)) {
        localObject2 = paramObject;
      } else {
        throw ((Result.Failure)paramObject).exception;
      }
      break;
    case 0: 
      if ((paramObject instanceof Result.Failure)) {
        break label276;
      }
      this.this$0.log("in coroutine. Before suspend.");
      this.L$0 = this;
      this.label = 1;
      SafeContinuation localSafeContinuation = new SafeContinuation(IntrinsicsKt.intercepted(this));
      Continuation localContinuation = (Continuation)localSafeContinuation;
      this.this$0.log("in suspend block.");
      HongMoActivity localHongMoActivity1 = this.this$0;
      CoroutineContext.Element localElement = localContinuation.getContext().get((CoroutineContext.Key)HongMoActivity.FilePath.Key);
      if (localElement == null) {
        Intrinsics.throwNpe();
      }
      String str1 = localHongMoActivity1.calcMd5(((HongMoActivity.FilePath)localElement).getPath());
      localContinuation.resumeWith(Result.constructor-impl(str1));
      this.this$0.log("after resume.");
      localObject2 = localSafeContinuation.getOrThrow();
      if (localObject2 == IntrinsicsKt.getCOROUTINE_SUSPENDED()) {
        DebugProbesKt.probeCoroutineSuspended(this);
      }
      if (localObject2 == localObject1) {
        return localObject1;
      }
      break;
    }
    String str2 = (String)localObject2;
    HongMoActivity localHongMoActivity2 = this.this$0;
    StringBuilder localStringBuilder = new StringBuilder();
    localStringBuilder.append("in coroutine. After suspend. result = ");
    localStringBuilder.append(str2);
    localHongMoActivity2.log(localStringBuilder.toString());
    return Unit.INSTANCE;
    label276:
    throw ((Result.Failure)paramObject).exception;
  }
}

转成safeContinuation后,计算得到结果resumeWith+getOrThrow然后传递给completion

关键代码在于:

public suspend inline fun <T> suspendCoroutine(crossinline block: (Continuation<T>) -> Unit): T =
    suspendCoroutineUninterceptedOrReturn { c: Continuation<T> ->
        val safe = SafeContinuation(c.intercepted())
        block(safe)
        safe.getOrThrow()
    }

这里会转换成cps代码,

  • 使用suspendCoroutine时,先转成SafeContinuation,
  • getOrThrow如果拿到的是suspend,直接返回。
  • 返回到前面BaseContinuationImpl的循环后,因为completion不是BaseContinuationImpl,跳出,走到competion.resumeWith结束。

异步时

case 0: 
      if ((paramObject instanceof Result.Failure)) {
        break label228;
      }
      this.this$0.log("in coroutine. Before suspend.");
      this.L$0 = this;
      this.label = 1;
      SafeContinuation localSafeContinuation = new SafeContinuation(IntrinsicsKt.intercepted(this));
      Continuation localContinuation = (Continuation)localSafeContinuation;
      this.this$0.log("in suspend block.");
      HongMoActivity.access$getExecutor$p(this.this$0).submit((Runnable)new HongMoActivity.main.1.invokeSuspend..inlined.suspendCoroutine.lambda.1(localContinuation, this));
      localObject2 = localSafeContinuation.getOrThrow();
      if (localObject2 == IntrinsicsKt.getCOROUTINE_SUSPENDED()) {
        DebugProbesKt.probeCoroutineSuspended(this);
      }
      if (localObject2 == localObject1) {
        return localObject1;
      }
      break;
    }

可以看到submit后,直接getOrThrow此时拿到的为suspend所以直接返回,

runnable中:

public final void run()
  {
    Continuation localContinuation = this.$continuation;
    HongMoActivity localHongMoActivity = this.this$0.this$0;
    CoroutineContext.Element localElement = this.$continuation.getContext().get((CoroutineContext.Key)HongMoActivity.FilePath.Key);
    if (localElement == null) {
      Intrinsics.throwNpe();
    }
    String str = localHongMoActivity.calcMd5(((HongMoActivity.FilePath)localElement).getPath());
    localContinuation.resumeWith(Result.constructor-impl(str));
    this.this$0.this$0.log("after resume.");
  }

safeContinunation重新resumeWith会进入label = 1返回值给completion

总结

suspend代表挂起,即线程执行到这里时可能会直接break返回,同时会增加一个continuation代表一个继续点,这个也好理解,下次被resume时就会从这个continuation继续执行。

待续

  • async|launch等是如何精简协程操作的
  • intercept是怎么操作的
  • combineContext到底有什么意义
  • 异步是如何拿到continuation且继续的
  • suspendCoroutine用什么进行替代

下期继续学习上面这些疑惑,btw新年快乐啊,这篇拖了太久了打脸

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,185评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,445评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,684评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,564评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,681评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,874评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,025评论 3 408
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,761评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,217评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,545评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,694评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,351评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,988评论 3 315
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,778评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,007评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,427评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,580评论 2 349

推荐阅读更多精彩内容