前言
协程系列文章:
上篇文章分析了协程切换到主线程执行的详细流程,本篇将分析如何切换到子线程执行。
通过本篇文章,你将了解到:
- 切换到子线程场景
- Dispatchers.Default 分发流程详解
- Dispatchers.IO 分发流程详解
- 与Java 线程池比对
- 协程到底在哪个线程执行?
1. 切换到子线程场景
Demo 展示
先看一个最常见的网络请求Demo:
fun showStuName() {
GlobalScope.launch(Dispatchers.Main) {
var stuInfo = withContext(Dispatchers.IO) {
//模拟网络请求
Thread.sleep(3000)
"我是小鱼人"
}
//展示
Toast.makeText(context, stuInfo, Toast.LENGTH_SHORT).show()
}
}
因为是耗时操作,因此需要切换到子线程处理,又因为是网络请求,属于I/O操作,因此使用Dispatchers.IO 分发器。
若任务偏向于计算型,比较耗费CPU,可以改写如下:
fun dealCpuTask() {
GlobalScope.launch(Dispatchers.Main) {
//切换到子线程
withContext(Dispatchers.Default) {
var i = 0
val count = 100000
while(i < count) {
Thread.sleep(1)
}
}
}
}
Dispatchers.IO/Dispatchers.Default 异同
两者都是协程分发器,Dispatchers.IO 侧重于任务本身是阻塞型的,比如文件、数据库、网络等操作,此时是不怎么占用CPU的。而Dispatchers.Default 侧重于计算型的任务,可能会长时间占用CPU。
协程线程池在设计的时候,针对两者在线程的调度策略上有所不同。
2. Dispatchers.Default 分发流程详解
任务分发
以上面的Demo 为例,从源码角度分析分发流程。
从前面的文章很容易了解到:withContext()函数里构造了DispatchedContinuation,它本身也是个Runnable,通过:
//this 指DispatchedContinuation 本身
dispatcher.dispatch(context, this)
进行分发。
而dispatcher 就是分发器,我们这里用的是Dispatchers.Default,因此先来看看它的实现。
#Dispatchers.kt
actual object Dispatchers {
@JvmStatic
actual val Default: CoroutineDispatcher = createDefaultDispatcher()
@JvmStatic
public val IO: CoroutineDispatcher = DefaultScheduler.IO
@JvmStatic
public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher
}
可以看出Dispatchers 是个单例。
#CoroutineContext.kt
//useCoroutinesScheduler 默认为true
//使用DefaultScheduler
internal actual fun createDefaultDispatcher(): CoroutineDispatcher =
if (useCoroutinesScheduler) DefaultScheduler else CommonPool
#Dispatcher.kt
internal object DefaultScheduler : ExperimentalCoroutineDispatcher() {
//定义IO 分发器
//...
}
DefaultScheduler 也是个单例,内容不多,其功能实现还得继续往上看。
ExperimentalCoroutineDispatcher 定义如下:
#Dispatcher.kt
open class ExperimentalCoroutineDispatcher(
//核心线程数
private val corePoolSize: Int,
//最大线程个数
private val maxPoolSize: Int,
//空闲线程的存活时间
private val idleWorkerKeepAliveNs: Long,
//线程名前缀
private val schedulerName: String = "CoroutineScheduler"
) : ExecutorCoroutineDispatcher() {
constructor(
//初始化参数的值
corePoolSize: Int = CORE_POOL_SIZE,
maxPoolSize: Int = MAX_POOL_SIZE,
schedulerName: String = DEFAULT_SCHEDULER_NAME
) : this(corePoolSize, maxPoolSize, IDLE_WORKER_KEEP_ALIVE_NS, schedulerName)
//真正的线程池实现
private var coroutineScheduler = createScheduler()
//分发
override fun dispatch(context: CoroutineContext, block: Runnable): Unit =
try {
//分发实现
coroutineScheduler.dispatch(block)
} catch (e: RejectedExecutionException) {
//...
}
}
//真正的线程池实现为:CoroutineScheduler
private fun createScheduler() = CoroutineScheduler(corePoolSize, maxPoolSize, idleWorkerKeepAliveNs, schedulerName)
查看CoroutineScheduler.dispatch()函数:
//block 为DispatchedContinuation
fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, tailDispatch: Boolean = false) {
//构建Task对象,block 本身就是Task类型
val task = createTask(block, taskContext)
//当前线程是否是Worker类型,也就是说当前线程是否是线程池内的线程
val currentWorker = currentWorker()//①
//如果是,则尝试提交任务到本地队列,否则返回任务本身
val notAdded = currentWorker.submitToLocalQueue(task, tailDispatch)//②
if (notAdded != null) {
//如果没有提交到本地队列,则提交到全局队列
if (!addToGlobalQueue(notAdded)) {//③
//添加队列失败则抛出异常
throw RejectedExecutionException("$schedulerName was terminated")
}
}
//是否需要跳过唤醒线程,主要用在IO分发器
val skipUnpark = tailDispatch && currentWorker != null
if (task.mode == TASK_NON_BLOCKING) {//④
if (skipUnpark) return
//非阻塞任务,唤醒cpu 线程
signalCpuWork()//⑤
} else {
//阻塞任务,唤醒blocking 线程
signalBlockingWork(skipUnpark = skipUnpark)//⑥
}
}
这函数是分发核心,注释里标明了6个点,现在一一阐述:
①
private fun currentWorker(): Worker? = (Thread.currentThread() as? Worker)?.takeIf { it.scheduler == this }
Worker 本身是继承自Thread 的,因此Worker 是线程类,代表线程池里的线程。通过判断是否是Worker类型来确认当前线程是否属于线程池内的线程。
②
private fun CoroutineScheduler.Worker?.submitToLocalQueue(task: Task, tailDispatch: Boolean): Task? {
//Worker 为空,直接返回任务本身
if (this == null) return task
//非阻塞的任务,则直接返回
if (task.mode == TASK_NON_BLOCKING && state === CoroutineScheduler.WorkerState.BLOCKING) {
return task
}
//表示本地队列里存有任务了
mayHaveLocalTasks = true
//加入到本地队列里
//localQueue 为Worker的成员变量
return localQueue.add(task, fair = tailDispatch)
}
③
若是②没有成功加入到本地队列里,则尝试加入到全局队列里:
private fun addToGlobalQueue(task: Task): Boolean {
return if (task.isBlocking) {
//加入到阻塞队列
globalBlockingQueue.addLast(task)
} else {
//加入到cpu队列
globalCpuQueue.addLast(task)
}
}
结合②③分析,目前为止,出现了三个队列:
④
主要用于判断任务是阻塞还是非阻塞的,这在任务构造的时候就已经指定,若是使用Dispatchers.Default 分发器,那么构造的任务是非阻塞的,而使用Dispatchers.IO,则构造的任务是阻塞的。
⑤
⑤⑥ 是针对阻塞与否进行不同的处理。
fun signalCpuWork() {
//尝试去唤醒正在挂起的线程,若是有线程可以被唤醒,则无需创建新线程
if (tryUnpark()) return
//若唤醒不成功,则需要尝试创建线程
if (tryCreateWorker()) return
//再试一次,边界条件
tryUnpark()
}
tryUnpark()函数主要作用是从栈里取出挂起的线程(Worker),入栈的的时机是当线程没有任务可以处理时进行挂起,此时会记录在栈里。
重点是tryCreateWorker()函数:
private fun tryCreateWorker(state: Long = controlState.value): Boolean {
//获取当前已经创建的线程数
val created = createdWorkers(state)
//获取当前阻塞的任务数
val blocking = blockingTasks(state)
//已创建的线程数-阻塞的任务数=非阻塞的线程数
//coerceAtLeast(0) 表示结果至少是0
val cpuWorkers = (created - blocking).coerceAtLeast(0)
//如果非阻塞数小于核心线程数
if (cpuWorkers < corePoolSize) {
//创建线程
val newCpuWorkers = createNewWorker()
//如果当前只有一个非阻塞线程并且核心线程数>1,那么再创建一个线程
//目的是为了方便"偷"任务...
if (newCpuWorkers == 1 && corePoolSize > 1) createNewWorker()
//创建成功
if (newCpuWorkers > 0) return true
}
return false
}
创建新线程为什么与阻塞任务的多少关联呢?
简单举个例子:
- 现在若是已经创建了5个线程,而这几个线程都在执行IO任务,此时就需要再创建新的线程来执行任务,因为此时CPU是空闲的。
- 只要非阻塞任务的个数小于核心线程数,那么就需要创建新的线程,目的是为了充分利用CPU。
再看createNewWorker() 是如何创建新的线程(Worker)的。
private fun createNewWorker(): Int {
//workers 为Worker 数组,因为需要对数组进行add 操作,因此需要同步访问
synchronized(workers) {
if (isTerminated) return -1
val state = controlState.value
//获取已创建的线程数
val created = createdWorkers(state)
//阻塞的任务数
val blocking = blockingTasks(state)
//非阻塞的线程数
val cpuWorkers = (created - blocking).coerceAtLeast(0)
//非阻塞的线程数不能超过核心线程数
if (cpuWorkers >= corePoolSize) return 0
//已创建的线程数不能大于最大线程数
if (created >= maxPoolSize) return 0
val newIndex = createdWorkers + 1
require(newIndex > 0 && workers[newIndex] == null)
//构造线程
val worker = Worker(newIndex)
//记录到数组里
workers[newIndex] = worker
//记录创建的线程数
require(newIndex == incrementCreatedWorkers())
//开启线程
worker.start()
//当前非阻塞线程数
return cpuWorkers + 1
}
}
⑥
signalBlockingWork()函数调用时会记录阻塞的任务数,其它与signalCpuWork 一致。
至此,Dispatchers.Default 任务分发流程已经结束,其重点:
- 构造任务,添加到队列里(三个队列中选一个)。
- 唤醒挂起的线程或是创建新的线程。
任务执行
既然任务都提交到队列了,该线程出场执行任务了。
internal inner class Worker private constructor() : Thread() {}
Worker 创建并启动后,将会执行run()函数:
override fun run() = runWorker()
private fun runWorker() {
var rescanned = false
//一直查找,除非worker终止了
while (!isTerminated && state != CoroutineScheduler.WorkerState.TERMINATED) {
//从队列里寻找任务
//mayHaveLocalTasks:本地队列里是否有任务
val task = findTask(mayHaveLocalTasks) //①
if (task != null) {
rescanned = false
minDelayUntilStealableTaskNs = 0L
//任务获取到后,执行任务
executeTask(task)//②
//任务执行完毕,继续循环查找任务
continue
} else {
mayHaveLocalTasks = false
}
if (minDelayUntilStealableTaskNs != 0L) {
//延迟一会儿,再去偷
if (!rescanned) {
rescanned = true
} else {
//挂起一段时间
}
continue
}
//尝试挂起
tryPark()//③
}
//释放token
tryReleaseCpu(CoroutineScheduler.WorkerState.TERMINATED)
}
同样的,标注了3个重点,一一分析之。
①
findTask()顾名思义:找任务。
传入的参数表示是否扫描本地队列,若是之前有提交任务到本地队列,则此处mayHaveLocalTasks = true。
fun findTask(scanLocalQueue: Boolean): Task? {
//尝试获取cpu 许可
//若是拿到cpu 许可,则可以执行任何任务
if (tryAcquireCpuPermit()) return findAnyTask(scanLocalQueue)
//拿不到,若是本地队列有任务,则从本地取,否则从全局阻塞队列取
val task = if (scanLocalQueue) {
localQueue.poll() ?: globalBlockingQueue.removeFirstOrNull()
} else {
globalBlockingQueue.removeFirstOrNull()
}
//都拿不到,则偷别人的
return task ?: trySteal(blockingOnly = true)
}
private fun findAnyTask(scanLocalQueue: Boolean): Task? {
if (scanLocalQueue) {
//可以从本地队列找
val globalFirst = nextInt(2 * corePoolSize) == 0
if (globalFirst) pollGlobalQueues()?.let { return it }
localQueue.poll()?.let { return it }
if (!globalFirst) pollGlobalQueues()?.let { return it }
} else {
//从全局队列找
pollGlobalQueues()?.let { return it }
}
//偷别人的
return trySteal(blockingOnly = false)
}
此处解释一下获取cpu 许可的含义:
它和核心线程数相关,假设我们是8核CPU,那么同一时间最多只能有8个线程在CPU上执行。因此,若是其它线程想要执行非阻塞任务(占用CPU),需要申请许可(token),申请成功说明有CPU空闲,此时该线程可以执行非阻塞任务。否则,只能执行阻塞任务。
当从本地队列、全局队列里都没找出任务时,当前的Worker打起了别个Woker的主意。我们知道全局队列是所有Worker共享,而本地队列是每个Worker私有的。因此,当前Worker发现自己没任务可以执行的时候会去看看其它Worker的本地队列里是否有可以执行的任务,若是有就可以偷过来用。
看看如何偷的:
private fun trySteal(blockingOnly: Boolean): Task? {
//自己本地没有才能偷
kotlinx.coroutines.assert { localQueue.size == 0 }
//所有的已创建的workers个数
val created = createdWorkers
//遍历workers数组
repeat(created) {
++currentIndex
if (currentIndex > created) currentIndex = 1
val worker = workers[currentIndex]
if (worker !== null && worker !== this) {
//从别的worker里的本地队列偷到自己的本地队列
val stealResult = if (blockingOnly) {
localQueue.tryStealBlockingFrom(victim = worker.localQueue)
} else {
localQueue.tryStealFrom(victim = worker.localQueue)
}
//偷成功,则取出任务
if (stealResult == TASK_STOLEN) {
return localQueue.poll()
} else if (stealResult > 0) {
minDelay = min(minDelay, stealResult)
}
}
}
//...没偷到
return null
}
实际上偷的本质是:
从别人的本队队列里取出任务放到自己的本地队列,最后取出任务返回。
②
拿到任务后,就开始执行任务。
private fun executeTask(task: Task) {
//模式:阻塞/非阻塞
val taskMode = task.mode
idleReset(taskMode)
//当前任务是非阻塞任务,则尝试释放cpu token,并执行signalCpuWork
beforeTask(taskMode)
//真正执行任务
runSafely(task)
//修改状态
afterTask(taskMode)
}
fun runSafely(task: Task) {
try {
//task 其实就是DispatchedContinuation
task.run()
} catch (e: Throwable) {
//..
} finally {
unTrackTask()
}
}
此时线程正式执行任务了。
③
若是线程没有找到任何任务执行,则尝试挂起。
private fun tryPark() {
//没有在挂起栈里
if (!inStack()) {
//将worker放入挂起栈里
parkedWorkersStackPush(this)
return
}
//...
while (inStack() && workerCtl.value == CoroutineScheduler.PARKED) { // Prevent spurious wakeups
if (isTerminated || state == CoroutineScheduler.WorkerState.TERMINATED) break
//...
//真正挂起,并标记worker state 状态
park()
}
}
最后一步的park()里会修改state = WorkerState.TERMINATED,在最外层的循环里会判断该标记,若是终止了,则循环停止,整个线程执行结束。
至此,任务执行流程结束,其重点:
- 从全局队列、本地队列里查找任务。
- 若是没找到,则尝试从别的Worker 本地队列里偷取任务。
- 1、2 能够找到任务则执行Runnable.run()函数,该函数里最终会执行协程体里的代码。
- 若是没有任务,则根据策略挂起一段时间或是最终退出线程的执行。
结合任务分发与任务执行流程,有如下流程图:
3. Dispatchers.IO 分发流程详解
Dispatchers.IO 定义
internal object DefaultScheduler : ExperimentalCoroutineDispatcher() {
//创建LimitingDispatcher
val IO: CoroutineDispatcher = LimitingDispatcher(
this,
systemProp(IO_PARALLELISM_PROPERTY_NAME, 64.coerceAtLeast(AVAILABLE_PROCESSORS)),
"Dispatchers.IO",
TASK_PROBABLY_BLOCKING
)
//...
}
Dispatchers.IO 作为DefaultScheduler 里的成员变量,并且它的分发器使用的是DefaultScheduler 本身。
构造函数里指明了并行的数量限制,以及它属于TASK_PROBABLY_BLOCKING(阻塞任务)。
任务分发
private fun dispatch(block: Runnable, tailDispatch: Boolean) {
var taskToSchedule = block
while (true) {
//记录在等待执行的任务
val inFlight = inFlightTasks.incrementAndGet()
//如果小于并行数
if (inFlight <= parallelism) {
//直接分发 dispatcher= DefaultScheduler
dispatcher.dispatchWithContext(taskToSchedule, this, tailDispatch)
return
}
//等待执行的任务超过并行数,则加入到队列里
queue.add(taskToSchedule)
//碰运气,看是否有任务释放了
if (inFlightTasks.decrementAndGet() >= parallelism) {
return
}
//若释放了,则取出队列里的任务执行
taskToSchedule = queue.poll() ?: return
}
}
可以看出Dispatchers.IO 任务分发是借助于DefaultScheduler,也就是Dispatchers.Default的能力,因此两者是共用一个线程池。
只是Dispatchers.IO 比较特殊,它有个队列,该队列作用:
当IO 任务分派个数超过设定的并行数时,不会直接进行分发,而是先存放在队列里。
那它什么时候取出来呢?
当任务执行完毕,也就是DispatchedTask.run()函数执行完毕后会调用:
taskContext.afterTask(),来看它的实现:
override fun afterTask() {
//从队列里取出
var next = queue.poll()
if (next != null) {
//继续分发
dispatcher.dispatchWithContext(next, this, true)
return
}
inFlightTasks.decrementAndGet()
//...
}
举个简单例子:
假设现在最大的并行数是64,线程池分配了64个线程执行IO任务,当第65个任务到来之时,因为超出了64,因此会放入队列里。当64个任务有某个任务执行完毕后,会从队列里取出第65个任务进行分发。
这样做的目的是什么呢?
为了限制突然间创建了许多IO线程,浪费资源,因此在线程池之外再加了一层防护,多出的任务先进入缓冲队列。
4. 与Java 线程池比对
使用过Java 线程池的小伙伴可能会知道,Java 线程池与Kotlin协程池 本质上都是:"池化技术的体现”。
它们的优势:
- 减少线程频繁开启/关闭的资源消耗。
- 及时响应并执行任务。
- 较好地管控/监控 应用内的线程使用。
Java 线程池原理:
- 核心线程+队列+非核心线程。
- 首先使用核心线程执行任务,若是核心线程个数已满,则将任务加入到队列里,核心线程从队列里取出任务执行,若是队列已满,则再开启非核心线程执行任务。
更详细的Java 线程池原理与使用请移步:Java 线程池之必懂应用-原理篇(上)
协程线程池原理:
- 全局队列(阻塞+非阻塞)+ 本地队列。
- IO 任务分发还有个缓存队列。
- 线程从队列里寻找任务(包括偷)并执行,若是使用IO 分发器,则超出限制的任务将会放到缓存队列里。
两者区别:
- Java 线程池开放API,比较灵活,调用者可以根据不同的需求组合不同形式的线程池,没有区分任务的特点(阻塞/非阻塞)。
- 协程线程池专供协程使用,区分任务特点,进而进行更加合理的调度。
5. 协程到底在哪个线程执行?
回到我们上篇末尾的问题:
fun launch3() {
GlobalScope.launch(Dispatchers.IO) {
println("1>>>${Thread.currentThread()}")
withContext(Dispatchers.Default) {
println("2>>>${Thread.currentThread()}")
delay(2000)
println("3>>>${Thread.currentThread()}")
}
println("4>>>${Thread.currentThread()}")
}
}
理解了线程池原理,答案就呼之欲出了。
1、4 可能不在同一线程。
2、3 可能不在同一线程。
1、2 可能在同一线程。
看到这结果,你可能会觉得:废话!
容我解释:因为线程池本身的调度侧重于执行任务,而非使用哪个特定的线程执行,因此具体分派到哪个线程执行需要看哪个线程刚好拿到了任务。
下篇将分析协程的取消与异常处理,敬请关注。
本文基于Kotlin 1.5.3,文中完整Demo请点击
您若喜欢,请点赞、关注、收藏,您的鼓励是我前进的动力
持续更新中,和我一起步步为营系统、深入学习Android/Kotlin
1、Android各种Context的前世今生
2、Android DecorView 必知必会
3、Window/WindowManager 不可不知之事
4、View Measure/Layout/Draw 真明白了
5、Android事件分发全套服务
6、Android invalidate/postInvalidate/requestLayout 彻底厘清
7、Android Window 如何确定大小/onMeasure()多次执行原因
8、Android事件驱动Handler-Message-Looper解析
9、Android 键盘一招搞定
10、Android 各种坐标彻底明了
11、Android Activity/Window/View 的background
12、Android Activity创建到View的显示过
13、Android IPC 系列
14、Android 存储系列
15、Java 并发系列不再疑惑
16、Java 线程池系列
17、Android Jetpack 前置基础系列
18、Android Jetpack 易懂易学系列
19、Kotlin 轻松入门系列
20、Kotlin 协程系列全面解读