Kotlin协程是一种编程思想,其中一个比较重要的应用场景就是线程控制。
以往我们在需要并发的时候,往往会用到java 的Executor和Android 的AsyncTask, 这样会导致跟直接使用Thead一样的困难和麻烦
1、线程什么时候结束
2、线程间的互相通信
3、多个线程的管理
然后就有了比较出名的Rxjava,它采用[Obervable]的编程范式的链式调用方式,解决了callback比较多的难题。而同样Kotlin 协程也能够解决这个问题,而且它以同步的方式写异步代码,从代码角度来看结构更清晰。
举个例子:
coroutineScope.launch(Dispatcher.Main){
val user = withContext(Dispatcher.IO){
api.login() // IO 线程执行请求
}
nameTv.text = user.name // 主线程更新ui
}
是不是比较简单,另外Kotlin协程还可以处理并行的请求处理,比如
coroutineScope.launch(Dispatcher.Main){
val avatar = async { api.getAvatar(user)}
val logo = async { api.getLogo(user)}
show(avatar.await(), logo.await())
}
async函数返回的是Deferred类型,意思是延时,稍后拿到结果,取结果需调用Deferred.await()方法。
suspend 函数
suspend标注的函数是一个耗时函数,该函数只能在kotlin协程函数或者是另外一个suspend函数里面被调用。而真正实施线程切换的是withContext(Dispatchers.IO)。
这里先介绍一下Dispatchers.IO是个什么东西
public val IO: CoroutineDispatcher = DefaultScheduler.IO
public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher
public actual val Default: CoroutineDispatcher = createDefaultDispatcher()
CoroutineDispatcher 是所有Coroutine Dispatcher的父类
public abstract class CoroutineDispatcher :
AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
// 判断是否需求分发
public open fun isDispatchNeeded(context: CoroutineContext): Boolean = true
//分发runnable
public abstract fun dispatch(context: CoroutineContext, block: Runnable)
//创建一个DispatchedContinuation
public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
DispatchedContinuation(this, continuation)
public operator fun plus(other: CoroutineDispatcher): CoroutineDispatcher = other
}
val IO: CoroutineDispatcher = LimitingDispatcher(
this,
systemProp(IO_PARALLELISM_PROPERTY_NAME, 64.coerceAtLeast(AVAILABLE_PROCESSORS)),
"Dispatchers.IO",
TASK_PROBABLY_BLOCKING
)
这里知道Dispatchers.IO是个CoroutineDispatcher的实例就行了,具体它是如何实现线程间的调度的我们稍后再说。
我们先看启动一个协程的过程
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
//调用内部的plus方法生成一个新的CoroutineContext
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)
coroutine.start(start, coroutine, block)
return coroutine
}
private open class StandaloneCoroutine(
parentContext: CoroutineContext,
active: Boolean
) : AbstractCoroutine<Unit>(parentContext, active) {
override fun handleJobException(exception: Throwable): Boolean {
handleCoroutineException(context, exception)
return true
}
}
//AbstractCoroutine
public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
initParentJob()
// 调用CoroutineStart里面的invoke方法
start(block, receiver, this)
}
//CoroutineStart
public operator fun <T> invoke(block: suspend () -> T, completion: Continuation<T>): Unit =
when (this) {
DEFAULT -> block.startCoroutineCancellable(completion)
ATOMIC -> block.startCoroutine(completion)
UNDISPATCHED -> block.startCoroutineUndispatched(completion)
LAZY -> Unit // will start lazily
}
public fun <T> (suspend () -> T).startCoroutineCancellable(completion: Continuation<T>): Unit = runSafely(completion) {
createCoroutineUnintercepted(completion).intercepted().resumeCancellableWith(Result.success(Unit))
}
//1、
public actual fun <R, T> (suspend R.() -> T).createCoroutineUnintercepted(
receiver: R, completion: Continuation<T>
): Continuation<Unit> {
//createCoroutineUnintercepted 创建一个Continuation对象,就是我们反编译的时候的那个create方法
return if (this is BaseContinuationImpl) create(receiver, completion) else // ...
}
@NotNull
public final Continuation create(@NotNull FlowCollector $this$create, Object it, @NotNull Continuation continuation) {
PlantListViewModel$$special$$inlined$flatMapLatest$1 var4 = new PlantListViewModel$$special$$inlined$flatMapLatest$1(continuation, this.$plantRepository$inlined);
var4.p$ = $this$create;
var4.p$0 = it;
return var4;
}
//2、
public actual fun <T> Continuation<T>.intercepted(): Continuation<T> =
(this as? ContinuationImpl)?.intercepted() ?: this
//ContinuationImpl
public fun intercepted(): Continuation<Any?> =
intercepted
?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
.also { intercepted = it }
//CoroutineDispatcher
public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
DispatchedContinuation(this, continuation)
//3、DispatchedContinuation
public fun <T> Continuation<T>.resumeCancellableWith(
result: Result<T>,
onCancellation: ((cause: Throwable) -> Unit)? = null
): Unit = when (this) {
is DispatchedContinuation -> resumeCancellableWith(result, onCancellation)
else -> resumeWith(result)
}
inline fun resumeCancellableWith(
result: Result<T>,
noinline onCancellation: ((cause: Throwable) -> Unit)?
) {
val state = result.toState(onCancellation)
//这里判断是否需要切换线程
if (dispatcher.isDispatchNeeded(context)) {
_state = state
resumeMode = MODE_CANCELLABLE
//调用对应的调度器,分发事件,实现线程切换
dispatcher.dispatch(context, this)
} else {
executeUnconfined(state, MODE_CANCELLABLE) {
if (!resumeCancelled(state)) {
//不需要切换
resumeUndispatchedWith(result)
}
}
}
}
inline fun resumeUndispatchedWith(result: Result<T>) {
withCoroutineContext(context, countOrElement) {
continuation.resumeWith(result)
}
}
continuation.resumeWith()调用的是SuspendLambda.resumeWith,然后它调用的是父类的BaseContinuationImpl里面的resumeWith
//BaseContinuationImpl
public final override fun resumeWith(result: Result<Any?>) {
// This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume
var current = this
var param = result
while (true) {
// Invoke "resume" debug probe on every resumed continuation, so that a debugging library infrastructure
// can precisely track what part of suspended callstack was already resumed
probeCoroutineResumed(current)
with(current) {
val completion = completion!! // fail fast when trying to resume continuation without completion
val outcome: Result<Any?> =
try {
/// 调用SuspendLambda.invokeSuspend方法
val outcome = invokeSuspend(param)
if (outcome === COROUTINE_SUSPENDED) return
Result.success(outcome)
} catch (exception: Throwable) {
Result.failure(exception)
}
releaseIntercepted() // this state machine instance is terminating
if (completion is BaseContinuationImpl) {
// unrolling recursion via loop
current = completion
param = outcome
} else {
// top-level completion reached -- invoke and return
completion.resumeWith(outcome)
return
}
}
}
}
协程的线程调度
主线程
public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher
//MainDispatcherLoader
val dispatcher: MainCoroutineDispatcher = loadMainDispatcher()
private fun loadMainDispatcher(): MainCoroutineDispatcher {
return try {
// 初始化factories,List<MainDispatcherFactory>
factories.maxBy { it.loadPriority }?.tryCreateDispatcher(factories)
?: createMissingDispatcher()
} catch (e: Throwable) {
// Service loader can throw an exception as well
createMissingDispatcher(e)
}
}
public fun MainDispatcherFactory.tryCreateDispatcher(factories: List<MainDispatcherFactory>): MainCoroutineDispatcher =
try {
createDispatcher(factories)
} catch (cause: Throwable) {
createMissingDispatcher(cause, hintOnError())
}
//AndroidDispatcherFactory
override fun createDispatcher(allFactories: List<MainDispatcherFactory>) =
HandlerContext(Looper.getMainLooper().asHandler(async = true))
//HandlerContext
internal class HandlerContext private constructor(
private val handler: Handler,
private val name: String?,
private val invokeImmediately: Boolean
) : HandlerDispatcher(), Delay {
public constructor(
handler: Handler,
name: String? = null
) : this(handler, name, false)
//invokeImmediately 默认为false
override fun isDispatchNeeded(context: CoroutineContext): Boolean {
return !invokeImmediately || Looper.myLooper() != handler.looper
}
// 用handler分发事件
override fun dispatch(context: CoroutineContext, block: Runnable) {
handler.post(block)
}
override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
val block = Runnable {
with(continuation) { resumeUndispatched(Unit) }
}
handler.postDelayed(block, timeMillis.coerceAtMost(MAX_DELAY))
continuation.invokeOnCancellation { handler.removeCallbacks(block) }
}
override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle {
handler.postDelayed(block, timeMillis.coerceAtMost(MAX_DELAY))
return object : DisposableHandle {
override fun dispose() {
handler.removeCallbacks(block)
}
}
}
主线程的线程调度,使用的是handler进行消息的分发。
Dispatchers.IO
//DefaultScheduler
val IO: CoroutineDispatcher = LimitingDispatcher(
this,
systemProp(IO_PARALLELISM_PROPERTY_NAME, 64.coerceAtLeast(AVAILABLE_PROCESSORS)),
"Dispatchers.IO",
TASK_PROBABLY_BLOCKING
)
//ExperimentalCoroutineDispatcher
public open class ExperimentalCoroutineDispatcher(
private val corePoolSize: Int,
private val maxPoolSize: Int,
private val idleWorkerKeepAliveNs: Long,
private val schedulerName: String = "CoroutineScheduler"
) : ExecutorCoroutineDispatcher() {
public constructor(
corePoolSize: Int = CORE_POOL_SIZE,
maxPoolSize: Int = MAX_POOL_SIZE,
schedulerName: String = DEFAULT_SCHEDULER_NAME
) : this(corePoolSize, maxPoolSize, IDLE_WORKER_KEEP_ALIVE_NS, schedulerName)
@Deprecated(message = "Binary compatibility for Ktor 1.0-beta", level = DeprecationLevel.HIDDEN)
public constructor(
corePoolSize: Int = CORE_POOL_SIZE,
maxPoolSize: Int = MAX_POOL_SIZE
) : this(corePoolSize, maxPoolSize, IDLE_WORKER_KEEP_ALIVE_NS)
override val executor: Executor
get() = coroutineScheduler
// This is variable for test purposes, so that we can reinitialize from clean state
private var coroutineScheduler = createScheduler()
override fun dispatch(context: CoroutineContext, block: Runnable): Unit =
try {
//分发事件
coroutineScheduler.dispatch(block)
} catch (e: RejectedExecutionException) {
// CoroutineScheduler only rejects execution when it is being closed and this behavior is reserved
// for testing purposes, so we don't have to worry about cancelling the affected Job here.
DefaultExecutor.dispatch(context, block)
}
override fun dispatchYield(context: CoroutineContext, block: Runnable): Unit =
try {
coroutineScheduler.dispatch(block, tailDispatch = true)
} catch (e: RejectedExecutionException) {
// CoroutineScheduler only rejects execution when it is being closed and this behavior is reserved
// for testing purposes, so we don't have to worry about cancelling the affected Job here.
DefaultExecutor.dispatchYield(context, block)
}
//...
internal fun dispatchWithContext(block: Runnable, context: TaskContext, tailDispatch: Boolean) {
try {
coroutineScheduler.dispatch(block, context, tailDispatch)
} catch (e: RejectedExecutionException) {
// CoroutineScheduler only rejects execution when it is being closed and this behavior is reserved
// for testing purposes, so we don't have to worry about cancelling the affected Job here.
// TaskContext shouldn't be lost here to properly invoke before/after task
DefaultExecutor.enqueue(coroutineScheduler.createTask(block, context))
}
}
private fun createScheduler() = CoroutineScheduler(corePoolSize, maxPoolSize, idleWorkerKeepAliveNs, schedulerName)
}
// CoroutineScheduler 继承至Executor 线程池
internal class CoroutineScheduler(
@JvmField val corePoolSize: Int,
@JvmField val maxPoolSize: Int,
@JvmField val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS,
@JvmField val schedulerName: String = DEFAULT_SCHEDULER_NAME
) : Executor, Closeable {
override fun execute(command: Runnable) = dispatch(command)
fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, tailDispatch: Boolean = false) {
trackTask() // this is needed for virtual time support
// task是继承runnable
val task = createTask(block, taskContext)
// try to submit the task to the local queue and act depending on the result
// Worker 继承 Thread, 里面有一个local queue
val currentWorker = currentWorker()
//先加入到local queue里面
val notAdded = currentWorker.submitToLocalQueue(task, tailDispatch)
if (notAdded != null) {
// 加入到local queue失败,就加入到global queue里面
if (!addToGlobalQueue(notAdded)) {
// Global queue is closed in the last step of close/shutdown -- no more tasks should be accepted
throw RejectedExecutionException("$schedulerName was terminated")
}
}
val skipUnpark = tailDispatch && currentWorker != null
// Checking 'task' instead of 'notAdded' is completely okay
if (task.mode == TASK_NON_BLOCKING) {
if (skipUnpark) return
signalCpuWork()
} else {
// Increment blocking tasks anyway
signalBlockingWork(skipUnpark = skipUnpark)
}
}
}
Dispatchers.IO复杂一点,内部实现了线程池Executor,通过线程池进行事件的分发,原理也是类似的。
到此,Dispatchers.IO也分析完毕。
最后,说一下协程容易搞错的一些问题
1). 协程支持并发吗?开启多个协程改变某一个值,对值有什么影响?有什么办法可以避免?
首先,协程肯定是可以支持并发的,Dispatchers.IO以及Dispatchers.Default内部都是采用线程池来执行的,默认的核心线程数是CUP的核数。既然支持并发,那对同时对某个值进行赋值,肯定会导致该值混乱。那么遇到这种情况,我们有几种办法处理
- 可以采用单线程的模式,比如Dispatchers.Unconfined,或者withContext()的方式启动协程。虽然launch()和withContext()都可以开启一个协程,但是launch是并行启动,而withContext却是串行的。用async{}.await()也是可以的,await()是一个挂起函数,效果类似withContext().
- 使用Mutex,也就是加锁机制,和java的synchronized一样。使用mutex.withLock{*}实现数据同步。当然用Atomic也是可行的。
2). 启动协程的方式有launch 、withContext、async三种,各有什么区别?
launch是一个非suspend函数,也就是说它是非阻塞式的,可以并行执行
withContext是一个suspend函数,所以它是串行的
async不使用await()的时候跟launch一样,使用await()的时候跟withContext一样,因为await是一个suspend函数。async适用于多个并行任务,而且需要等待返回结果的情况下。