Kotlin协程

Kotlin协程是一种编程思想,其中一个比较重要的应用场景就是线程控制。
以往我们在需要并发的时候,往往会用到java 的Executor和Android 的AsyncTask, 这样会导致跟直接使用Thead一样的困难和麻烦
1、线程什么时候结束
2、线程间的互相通信
3、多个线程的管理

然后就有了比较出名的Rxjava,它采用[Obervable]的编程范式的链式调用方式,解决了callback比较多的难题。而同样Kotlin 协程也能够解决这个问题,而且它以同步的方式写异步代码,从代码角度来看结构更清晰。

举个例子:

coroutineScope.launch(Dispatcher.Main){
val user = withContext(Dispatcher.IO){
        api.login() // IO 线程执行请求
}
nameTv.text = user.name // 主线程更新ui
}

是不是比较简单,另外Kotlin协程还可以处理并行的请求处理,比如

coroutineScope.launch(Dispatcher.Main){
val avatar = async { api.getAvatar(user)}
val logo = async { api.getLogo(user)}
show(avatar.await(), logo.await())
}

async函数返回的是Deferred类型,意思是延时,稍后拿到结果,取结果需调用Deferred.await()方法。

suspend 函数
suspend标注的函数是一个耗时函数,该函数只能在kotlin协程函数或者是另外一个suspend函数里面被调用。而真正实施线程切换的是withContext(Dispatchers.IO)。

这里先介绍一下Dispatchers.IO是个什么东西

public val IO: CoroutineDispatcher = DefaultScheduler.IO
public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher
public actual val Default: CoroutineDispatcher = createDefaultDispatcher()

CoroutineDispatcher 是所有Coroutine Dispatcher的父类

public abstract class CoroutineDispatcher :
    AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
// 判断是否需求分发
 public open fun isDispatchNeeded(context: CoroutineContext): Boolean = true
//分发runnable
 public abstract fun dispatch(context: CoroutineContext, block: Runnable)
//创建一个DispatchedContinuation
public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
        DispatchedContinuation(this, continuation)

 public operator fun plus(other: CoroutineDispatcher): CoroutineDispatcher = other
}
 val IO: CoroutineDispatcher = LimitingDispatcher(
        this,
        systemProp(IO_PARALLELISM_PROPERTY_NAME, 64.coerceAtLeast(AVAILABLE_PROCESSORS)),
        "Dispatchers.IO",
        TASK_PROBABLY_BLOCKING
    )

这里知道Dispatchers.IO是个CoroutineDispatcher的实例就行了,具体它是如何实现线程间的调度的我们稍后再说。
我们先看启动一个协程的过程

public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job {
//调用内部的plus方法生成一个新的CoroutineContext
    val newContext = newCoroutineContext(context)
    val coroutine = if (start.isLazy)
        LazyStandaloneCoroutine(newContext, block) else
        StandaloneCoroutine(newContext, active = true)
    coroutine.start(start, coroutine, block)
    return coroutine
}

private open class StandaloneCoroutine(
    parentContext: CoroutineContext,
    active: Boolean
) : AbstractCoroutine<Unit>(parentContext, active) {
    override fun handleJobException(exception: Throwable): Boolean {
        handleCoroutineException(context, exception)
        return true
    }
}
//AbstractCoroutine
public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
        initParentJob()
// 调用CoroutineStart里面的invoke方法
        start(block, receiver, this)
    }
//CoroutineStart
 public operator fun <T> invoke(block: suspend () -> T, completion: Continuation<T>): Unit =
        when (this) {
            DEFAULT -> block.startCoroutineCancellable(completion)
            ATOMIC -> block.startCoroutine(completion)
            UNDISPATCHED -> block.startCoroutineUndispatched(completion)
            LAZY -> Unit // will start lazily
        }

public fun <T> (suspend () -> T).startCoroutineCancellable(completion: Continuation<T>): Unit = runSafely(completion) {
createCoroutineUnintercepted(completion).intercepted().resumeCancellableWith(Result.success(Unit))

}
//1、
public actual fun <R, T> (suspend R.() -> T).createCoroutineUnintercepted(
    receiver: R, completion: Continuation<T>
): Continuation<Unit> {
//createCoroutineUnintercepted 创建一个Continuation对象,就是我们反编译的时候的那个create方法
    return if (this is BaseContinuationImpl) create(receiver, completion) else // ...
}
 @NotNull
   public final Continuation create(@NotNull FlowCollector $this$create, Object it, @NotNull Continuation continuation) {
      PlantListViewModel$$special$$inlined$flatMapLatest$1 var4 = new PlantListViewModel$$special$$inlined$flatMapLatest$1(continuation, this.$plantRepository$inlined);
      var4.p$ = $this$create;
      var4.p$0 = it;
      return var4;
   }
//2、
public actual fun <T> Continuation<T>.intercepted(): Continuation<T> =
    (this as? ContinuationImpl)?.intercepted() ?: this
//ContinuationImpl
  public fun intercepted(): Continuation<Any?> =
        intercepted
            ?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
                .also { intercepted = it }
//CoroutineDispatcher
   public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
        DispatchedContinuation(this, continuation)
//3、DispatchedContinuation
public fun <T> Continuation<T>.resumeCancellableWith(
    result: Result<T>,
    onCancellation: ((cause: Throwable) -> Unit)? = null
): Unit = when (this) {
    is DispatchedContinuation -> resumeCancellableWith(result, onCancellation)
    else -> resumeWith(result)
}

 inline fun resumeCancellableWith(
        result: Result<T>,
        noinline onCancellation: ((cause: Throwable) -> Unit)?
    ) {
        val state = result.toState(onCancellation)
//这里判断是否需要切换线程
        if (dispatcher.isDispatchNeeded(context)) {
            _state = state
            resumeMode = MODE_CANCELLABLE
      //调用对应的调度器,分发事件,实现线程切换
            dispatcher.dispatch(context, this)
        } else {
            executeUnconfined(state, MODE_CANCELLABLE) {
                if (!resumeCancelled(state)) {
//不需要切换
                    resumeUndispatchedWith(result)
                }
            }
        }
    }
 inline fun resumeUndispatchedWith(result: Result<T>) {
        withCoroutineContext(context, countOrElement) {
            continuation.resumeWith(result)
        }
    }

continuation.resumeWith()调用的是SuspendLambda.resumeWith,然后它调用的是父类的BaseContinuationImpl里面的resumeWith

//BaseContinuationImpl
 public final override fun resumeWith(result: Result<Any?>) {
        // This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume
        var current = this
        var param = result
        while (true) {
            // Invoke "resume" debug probe on every resumed continuation, so that a debugging library infrastructure
            // can precisely track what part of suspended callstack was already resumed
            probeCoroutineResumed(current)
            with(current) {
                val completion = completion!! // fail fast when trying to resume continuation without completion
                val outcome: Result<Any?> =
                    try {
/// 调用SuspendLambda.invokeSuspend方法
                        val outcome = invokeSuspend(param)
                        if (outcome === COROUTINE_SUSPENDED) return
                        Result.success(outcome)
                    } catch (exception: Throwable) {
                        Result.failure(exception)
                    }
                releaseIntercepted() // this state machine instance is terminating
                if (completion is BaseContinuationImpl) {
                    // unrolling recursion via loop
                    current = completion
                    param = outcome
                } else {
                    // top-level completion reached -- invoke and return
                    completion.resumeWith(outcome)
                    return
                }
            }
        }
    }

协程的线程调度

主线程

public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher
//MainDispatcherLoader
 
 val dispatcher: MainCoroutineDispatcher = loadMainDispatcher()
 private fun loadMainDispatcher(): MainCoroutineDispatcher {
        return try {
           // 初始化factories,List<MainDispatcherFactory>
            factories.maxBy { it.loadPriority }?.tryCreateDispatcher(factories)
                ?: createMissingDispatcher()
        } catch (e: Throwable) {
            // Service loader can throw an exception as well
            createMissingDispatcher(e)
        }
    }
public fun MainDispatcherFactory.tryCreateDispatcher(factories: List<MainDispatcherFactory>): MainCoroutineDispatcher =
    try {
        createDispatcher(factories)
    } catch (cause: Throwable) {
        createMissingDispatcher(cause, hintOnError())
    }
//AndroidDispatcherFactory
 override fun createDispatcher(allFactories: List<MainDispatcherFactory>) =
        HandlerContext(Looper.getMainLooper().asHandler(async = true))
//HandlerContext

internal class HandlerContext private constructor(
    private val handler: Handler,
    private val name: String?,
    private val invokeImmediately: Boolean
) : HandlerDispatcher(), Delay {
    public constructor(
        handler: Handler,
        name: String? = null
    ) : this(handler, name, false)
//invokeImmediately 默认为false
    override fun isDispatchNeeded(context: CoroutineContext): Boolean {
        return !invokeImmediately || Looper.myLooper() != handler.looper
    }
// 用handler分发事件
    override fun dispatch(context: CoroutineContext, block: Runnable) {
        handler.post(block)
    }

    override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
        val block = Runnable {
            with(continuation) { resumeUndispatched(Unit) }
        }
        handler.postDelayed(block, timeMillis.coerceAtMost(MAX_DELAY))
        continuation.invokeOnCancellation { handler.removeCallbacks(block) }
    }

    override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle {
        handler.postDelayed(block, timeMillis.coerceAtMost(MAX_DELAY))
        return object : DisposableHandle {
            override fun dispose() {
                handler.removeCallbacks(block)
            }
        }
    }

主线程的线程调度,使用的是handler进行消息的分发。

Dispatchers.IO

//DefaultScheduler
 val IO: CoroutineDispatcher = LimitingDispatcher(
        this,
        systemProp(IO_PARALLELISM_PROPERTY_NAME, 64.coerceAtLeast(AVAILABLE_PROCESSORS)),
        "Dispatchers.IO",
        TASK_PROBABLY_BLOCKING
    )
//ExperimentalCoroutineDispatcher
public open class ExperimentalCoroutineDispatcher(
    private val corePoolSize: Int,
    private val maxPoolSize: Int,
    private val idleWorkerKeepAliveNs: Long,
    private val schedulerName: String = "CoroutineScheduler"
) : ExecutorCoroutineDispatcher() {
    public constructor(
        corePoolSize: Int = CORE_POOL_SIZE,
        maxPoolSize: Int = MAX_POOL_SIZE,
        schedulerName: String = DEFAULT_SCHEDULER_NAME
    ) : this(corePoolSize, maxPoolSize, IDLE_WORKER_KEEP_ALIVE_NS, schedulerName)

    @Deprecated(message = "Binary compatibility for Ktor 1.0-beta", level = DeprecationLevel.HIDDEN)
    public constructor(
        corePoolSize: Int = CORE_POOL_SIZE,
        maxPoolSize: Int = MAX_POOL_SIZE
    ) : this(corePoolSize, maxPoolSize, IDLE_WORKER_KEEP_ALIVE_NS)

    override val executor: Executor
        get() = coroutineScheduler

    // This is variable for test purposes, so that we can reinitialize from clean state
    private var coroutineScheduler = createScheduler()

    override fun dispatch(context: CoroutineContext, block: Runnable): Unit =
        try {
//分发事件
            coroutineScheduler.dispatch(block)
        } catch (e: RejectedExecutionException) {
            // CoroutineScheduler only rejects execution when it is being closed and this behavior is reserved
            // for testing purposes, so we don't have to worry about cancelling the affected Job here.
            DefaultExecutor.dispatch(context, block)
        }

    override fun dispatchYield(context: CoroutineContext, block: Runnable): Unit =
        try {
            coroutineScheduler.dispatch(block, tailDispatch = true)
        } catch (e: RejectedExecutionException) {
            // CoroutineScheduler only rejects execution when it is being closed and this behavior is reserved
            // for testing purposes, so we don't have to worry about cancelling the affected Job here.
            DefaultExecutor.dispatchYield(context, block)
        }
//...
    internal fun dispatchWithContext(block: Runnable, context: TaskContext, tailDispatch: Boolean) {
        try {
            coroutineScheduler.dispatch(block, context, tailDispatch)
        } catch (e: RejectedExecutionException) {
            // CoroutineScheduler only rejects execution when it is being closed and this behavior is reserved
            // for testing purposes, so we don't have to worry about cancelling the affected Job here.
            // TaskContext shouldn't be lost here to properly invoke before/after task
            DefaultExecutor.enqueue(coroutineScheduler.createTask(block, context))
        }
    }

    private fun createScheduler() = CoroutineScheduler(corePoolSize, maxPoolSize, idleWorkerKeepAliveNs, schedulerName)
}
// CoroutineScheduler 继承至Executor 线程池
internal class CoroutineScheduler(
    @JvmField val corePoolSize: Int,
    @JvmField val maxPoolSize: Int,
    @JvmField val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS,
    @JvmField val schedulerName: String = DEFAULT_SCHEDULER_NAME
) : Executor, Closeable {
 override fun execute(command: Runnable) = dispatch(command)

fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, tailDispatch: Boolean = false) {
        trackTask() // this is needed for virtual time support
// task是继承runnable
        val task = createTask(block, taskContext)
        // try to submit the task to the local queue and act depending on the result
        // Worker 继承 Thread, 里面有一个local queue
        val currentWorker = currentWorker()
//先加入到local queue里面
        val notAdded = currentWorker.submitToLocalQueue(task, tailDispatch)
        if (notAdded != null) {
// 加入到local queue失败,就加入到global queue里面
            if (!addToGlobalQueue(notAdded)) {
                // Global queue is closed in the last step of close/shutdown -- no more tasks should be accepted
                throw RejectedExecutionException("$schedulerName was terminated")
            }
        }
        val skipUnpark = tailDispatch && currentWorker != null
        // Checking 'task' instead of 'notAdded' is completely okay
        if (task.mode == TASK_NON_BLOCKING) {
            if (skipUnpark) return
            signalCpuWork()
        } else {
            // Increment blocking tasks anyway
            signalBlockingWork(skipUnpark = skipUnpark)
        }
    }
}

Dispatchers.IO复杂一点,内部实现了线程池Executor,通过线程池进行事件的分发,原理也是类似的。
到此,Dispatchers.IO也分析完毕。

最后,说一下协程容易搞错的一些问题
1). 协程支持并发吗?开启多个协程改变某一个值,对值有什么影响?有什么办法可以避免?
首先,协程肯定是可以支持并发的,Dispatchers.IO以及Dispatchers.Default内部都是采用线程池来执行的,默认的核心线程数是CUP的核数。既然支持并发,那对同时对某个值进行赋值,肯定会导致该值混乱。那么遇到这种情况,我们有几种办法处理

  1. 可以采用单线程的模式,比如Dispatchers.Unconfined,或者withContext()的方式启动协程。虽然launch()和withContext()都可以开启一个协程,但是launch是并行启动,而withContext却是串行的。用async{}.await()也是可以的,await()是一个挂起函数,效果类似withContext().
  2. 使用Mutex,也就是加锁机制,和java的synchronized一样。使用mutex.withLock{*}实现数据同步。当然用Atomic也是可行的。

2). 启动协程的方式有launch 、withContext、async三种,各有什么区别?
launch是一个非suspend函数,也就是说它是非阻塞式的,可以并行执行
withContext是一个suspend函数,所以它是串行的
async不使用await()的时候跟launch一样,使用await()的时候跟withContext一样,因为await是一个suspend函数。async适用于多个并行任务,而且需要等待返回结果的情况下。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • [TOC] 简介 Coroutines are computer program components that ...
    Whyn阅读 6,065评论 5 15
  • 在今年的三月份,我因为需要为项目搭建一个新的网络请求框架开始接触 Kotlin 协程。那时我司项目中同时存在着两种...
    业志陈阅读 1,156评论 0 5
  • 为什么要搞出和用协程呢 是节省CPU,避免系统内核级的线程频繁切换,造成的CPU资源浪费。好钢用在刀刃上。而协程是...
    静默的小猫阅读 684评论 0 2
  • 协程是轻量级的线程。 kotlin协程是kotlin的扩展库(kotlinx.coroutines)。 线程在An...
    付小影子阅读 6,523评论 0 4
  • 在今年的三月份,我因为需要为项目搭建一个新的网络请求框架开始接触 Kotlin 协程。那时我司项目中同时存在着两种...
    Android开发指南阅读 894评论 0 2