目前很多开发组都用上协程来处理异步任务了,但是有的地方协程提供的原生API还是不足以应付,比方说一些SDK提供了传入Executor的接口(以便复用调用者的线程池来执行异步任务),这时候可以用这JDK提供的线程池,或者封装一下协程也可以满足需求。
协程提供了Dispatchers.Default 和 Dispatchers.IO 分别用于 计算密集型 任务和 IO密集型 任务,类似于RxJava的 Schedulers.computation() 和 Schedulers.io()。
但两者有所差异,比如RxJava的 Schedulers.io() 不做并发限制,而 Dispatchers.io() 做了并发限制:
It defaults to the limit of 64 threads or the number of cores (whichever is larger)
考虑到当前移动设备的CPU核心数都不超过64,所以可以认为协程的 Dispatchers.IO 的最大并发为64。
Dispatchers.Default 的并发限制为:
By default, the maximal level of parallelism used by this dispatcher is equal to the number of CPU cores, but is at least two
考虑到目前Android设备核心数都在2个以上,所以可以认为 Dispatchers.Default 的最大并发为 CPU cores。
Dispatchers.Default 和 Dispatchers.IO 是共享协程自己的线程池的,二者可以复用线程。
不过目前这两个Dispatchers 并未完全满足项目中的需求,有时我们需要一些自定义的并发限制,其中最常见的是串行。
RxJava有Schedulers.single() ,但这个Schedulers.single()和AsyncTask的SERAIL_EXECOTOR一样,是全局串行,不同的任务处在同一个串行队列,会相互堵塞,因而可能会引发问题。
或许也是因为这个原因,kotlin协程没有定义“Dispatchers.Single"。
对于需要串行的场景,可以这样实现:
val coroutineContext: CoroutineContext =
Executors.newSingleThreadExecutor().asCoroutineDispatcher()
这样可以实现局部的串行,但和协程的线程池是相互独立的,不能复用线程。
线程池的好处:
- 提高响应速度:任务到达时,无需等待线程创建即可立即执行。
- 降低资源消耗:通过池化技术重复利用已创建的线程,降低线程创建和销毁造成的损耗。
- 提高线程的可管理性:线程是稀缺资源,如果无限制创建,不仅会消耗系统资源,还会因为线程的不合理分布导致资源调度失衡,降低系统的稳定性。使用线程池可以进行统一的分配、调优和监控。
然彼此独立创建线程池的话,会大打折扣。
如何既复用协程的线程池,又自主控制并发呢?
一个办法就是套队列来控制并发,然后还是任务还是执行在线程池之上。
AsyncTask 就是这样实现的:
private static class SerialExecutor implements Executor {
final ArrayDeque<Runnable> mTasks = new ArrayDeque<Runnable>();
Runnable mActive;
public synchronized void execute(final Runnable r) {
mTasks.offer(new Runnable() {
public void run() {
try {
r.run();
} finally {
scheduleNext();
}
}
});
if (mActive == null) {
scheduleNext();
}
}
protected synchronized void scheduleNext() {
if ((mActive = mTasks.poll()) != null) {
THREAD_POOL_EXECUTOR.execute(mActive);
}
}
}
用SerialExecutor的execute的任务会先进入队列,当mActive为空时从队列获取任务赋值给mActive然后通过线程池 THREAD_POOL_EXECUTOR执行。
当然AsyncTask 的SerialExecutor是全局唯一的,所以会有上面提到的各种任务相互堵塞的问题。可以通过创建不同是的SerialExecutor实例来达到各业务各自串行。
在Kotlin环境下,我们可以利用协程和Channel来实现:
fun Channel<Any>.runBlock(block: suspend CoroutineScope.() -> Unit) {
CoroutineScope(Dispatchers.Unconfined).launch {
send(0)
CoroutineScope(Dispatchers.IO).launch {
block()
receive()
}
}
}
// 使用方法
private val serialChannel = Channel<Any>(1)
serialChannel.runBlock {
// do somthing
}
添加Log编写测试如下:
private val a = AtomicInteger(0)
private val b = AtomicInteger(0)
fun Channel<Any>.runBlock(block: suspend CoroutineScope.() -> Unit) {
CoroutineScope(Dispatchers.Unconfined).launch {
Log.d("MyTag", "before send " + a.getAndIncrement() + getTime())
send(0)
Log.i("MyTag", "after send " + b.getAndIncrement() + getTime())
CoroutineScope(Dispatchers.Default).launch {
block()
receive()
}
}
}
private fun test() {
// 并发限制为1,串行执行任务
val channel = Channel<Any>(1)
val t1 = System.currentTimeMillis()
repeat(4) { x ->
channel.runBlock {
Thread.sleep(1000L)
Log.w("MyTag", "$x done job" + getTime())
}
}
CoroutineScope(Dispatchers.Default).launch {
while (!channel.isEmpty) {
delay(200)
}
val t2 = System.currentTimeMillis()
Log.d("MyTag", "Jobs all done, use time:" + (t2 - t1))
}
}
执行结果:
第一个任务可以顺利通过send(), 而随后的任务被suspend, 直到前面的任务执行完(执行block),调用recevie(), 然后下一个任务通过send() ……依此类推。
最终,消耗4s完成任务。
如果Channel的参数改成2,则能有两个任务可以通过send() :
最终,消耗2s完成任务。
关于参数可以参考Channel的构造函数:
public fun <E> Channel(capacity: Int = RENDEZVOUS): Channel<E> =
when (capacity) {
RENDEZVOUS -> RendezvousChannel()
UNLIMITED -> LinkedListChannel()
CONFLATED -> ConflatedChannel()
BUFFERED -> ArrayChannel(CHANNEL_DEFAULT_CAPACITY)
else -> ArrayChannel(capacity)
}
在前面的实现中, 我们关注UNLIMITED, BUFFERED 以及 capacity > 0 的情况即可:
- UNLIMITED: 不做限制;
- BUFFERED: 并发数由 kotlin "kotlinx.coroutines.channels.defaultBuffer"决定,目前测试得到8;
- capacity > 0, 则并发数由 capacity 决定;
- 特别地,当capacity = 1,为串行调度。
不过,[Dispatchers.IO] 本身有并发限制(目前版本是64),
所有对于 Channel.UNLIMITED 和 capacity > 64 的情况,和capacity=64的情况是相同的。
我们可以为不同的业务创建不同的Channel实例,从而各自控制并发且最终在协程的线程池上执行任务。
简要示意图如下:
为了简化,我们假设Dispatchers的并发限制为4。
- 不同Channel有各自的buffer, 当任务小于capacity时进入buffer, 大于capacity时新任务被suspend。
- Dispatchers 不断地执行任务然后调用receive(), 上面的实现中,receive并非要取什么信息,仅仅是让channel空出buffer, 好让被suspend的任务可以通过send()然后进入Dispatchers的调度。
- 极端情况下(进入Disptachers的任务大于并发限制时),任务进入Dispatchers也不会被立即执行,这个设定可以避免开启的线程太多而陷于线程上下文频繁切换的困境。
通过Channel可以实现并发的控制,但是日常开发中有的地方并不是简单地执行个任务,而是需要一个ExecutorService或者Executor。
我们可以通过Channel封装一下:
fun Channel<Any>.runBlock(block: suspend CoroutineScope.() -> Unit) {
CoroutineScope(Dispatchers.Unconfined).launch {
send(0)
CoroutineScope(Dispatchers.IO).launch {
block()
receive()
}
}
}
class ChannelExecutor(capacity: Int) : Executor {
private val channel = Channel<Any>(capacity)
override fun execute(command: Runnable) {
channel.runBlock {
command.run()
}
}
}
class ChannelExecutorService(capacity: Int) : AbstractExecutorService() {
private val channel = Channel<Any>(capacity)
override fun execute(command: Runnable) {
channel.runBlock {
command.run()
}
}
fun isEmpty(): Boolean {
return channel.isEmpty || channel.isClosedForReceive
}
override fun shutdown() {
channel.close()
}
override fun shutdownNow(): MutableList<Runnable> {
shutdown()
return mutableListOf()
}
@ExperimentalCoroutinesApi
override fun isShutdown(): Boolean {
return channel.isClosedForSend
}
@ExperimentalCoroutinesApi
override fun isTerminated(): Boolean {
return channel.isClosedForReceive
}
override fun awaitTermination(timeout: Long, unit: TimeUnit): Boolean {
var millis = unit.toMillis(timeout)
while (!isTerminated && millis > 0) {
try {
Thread.sleep(200L)
millis -= 200L
} catch (ignore: Exception) {
}
}
return isTerminated
}
}
需要简单地控制并发的地方,直接定义Channel然后调用runBlock即可;
需要Executor的地方,可创建ChannelExecutor来执行。