目录
[toc]
1、协程是什么
如果我们去维基百科,可以找到一段类似的话:
协程是一种非抢占式或者说协作式的计算机程序并发调度的实现,程序可以主动挂起或者恢复执行。
又如果你看了网上的很多文章,它们也可能这么说:
- 协程是轻量级的线程。
- 协程没有用户态到内核态的切换。
- 协程的调度是协作式的,而线程是抢占式的。
我们先不说上面的这些说法对不对。反正,相信无论是那种方式,看了之后你还是一脸懵逼。
事实上,协程是一种脱离语言的概念。从本质上说,协程就是一段程序,它能够被挂起,待会儿再恢复执行。这里的挂起和恢复执行都是程序主动控制的(所以叫协作式),而不是像线程那样是由操作系统调度的。每个语言对协程都有自己的实现(也可能压根儿就不支持协程,如Java)。这也是我们对协程这个概念感到模糊和混乱的原因,因为协程本身的定义就是模糊的,而每个语言的实现又都不太一样。
这篇文章,就以Kotlin中的协程为例。我们先介绍协程的基本用法,再看下它是怎么实现的。
2、为什么要用协程
我们先来看一个例子。假设有一个耗时的方法,用于获取一个Account
对象。耗时的原因,可能因为是需要IO操作,或者是需要大量CPU资源,又或者本身就是需要延时一段时间后执行。
那么,在Android中,为了防止它阻塞主线程造成ANR,我们可以这么写:
getAccount(object : Callback<Account> {
override fun onFail(code: Int, message: String) {
// show error
}
override fun onSuc(response: Account) {
// Do something with Account
}
})
这就是典型的回调式的写法。在getAccount
方法的内部,会把获取Account
的逻辑放到后台线程中异步执行,并在结果返回后通过回调的形式返回给上层。
这种写法的一大问题是,代码杂乱不堪,我们很容易陷入到回调地狱中。例如,如果在接口访问失败后需要增加重试的逻辑,要怎么写?如果在成功后,还需要请求另一个接口,又要怎么写?一层又一层的回调,直觉告诉你就是在写bug。
幸运的是,我们有Rxjava。利用RxJava,我们可以这么写:
Observable.create { emitter: ObservableEmitter<Account> -> emitter.onNext(getAccount()) }
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe {
// Do something with Account
}
RxJava的链式调用,使得代码简洁了很多。但是RxJava也有自身的问题,它的操作符太抽象了,以至于RxJava的学习成本很大。例如说,还是上面的例子,如果我们想要增加出错重试的逻辑,要怎么做?如果不熟悉retry这个操作符,你可能要花点时间了。
但是,在使用Kotlin的协程版本中,我们的代码是这个样子的:
coroutineScope.launch { // 启动一个协程
val account = withContext(Dispatchers.IO) {
// getAccount的逻辑会在IO线程中执行
getAccount()
}
// 自动切换回原来的线程
doSomethingWithAccount(account)
}
上面代码最神奇的地方在于withContext(Dispatchers.IO)
,这行代码使得getAccount
的逻辑会自动在IO线程执行。而更加神奇的地方在于,getAccount
执行完毕后会自动切换回原来的线程执行doSomethingWithAccount
。在这里,我们可以执行UI刷新的逻辑。这一切,都是编译器帮助我们完成的。整个过程中我们没有用到回调(至少没有显性地看到)。
这就是协程最大的一个好处。简单的说,就是用同步的方式写异步的代码。这个特性,让我们的代码变得无比简洁。
3、协程应用指北
3.1、Gradle配置
在Android中,如果我们要使用协程,首先需要在app的build.gradle中,添加依赖:
dependencies {
implementation "org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.3"
implementation "org.jetbrains.kotlinx:kotlinx-coroutines-android:1.3.3"
}
这里引入的两个依赖分别是协程的核心库与平台库。kotlin是一个跨平台的语言。例如在js本身就是单线程的事件循环,这与Android有较大的差别。平台库的作用,就是这一层差别的抽象。
3.2、创建第一个协程
协程的创建方法有3种,我们一个一个来看。
3.2.1、使用runBlocking
顶层函数
runBlocking {
print("Hello World!")
}
这个顶层函数会阻塞线程,直至runBlocking
中的内容执行完毕。所以,这个方法一般也不会用在业务开发中。毕竟,我们使用协程就是因为它并发的能力。
3.2.2、GlobalScope
GlobalScope.launch {
print("Hello World!")
}
GlobalScope
是一个全局的作用域,使用这种方法创建的协程,生命周期与APP的生命周期一致。
因为不能控制生命周期,这种方式可能会造成内存泄漏,一般在业务开发中不太常见。
3.2.3、使用CoroutineScope
对象
val context = SupervisorJob() + Dispatchers.Main
val coroutineScope = CoroutineScope(context)
coroutineScope.launch {
print("Hello World!")
}
这个也是官方推荐的方式,使用一个CoroutineScope
对象来创建协程。创建CoroutineScope
需要一个Context
对象(这个Context
对象和Android中常见的Context
不是一个概念)。
更简单的,我们也可以直接使用MainScope
这个对象。
上面例子中的GlobalScope
本质也是一个CoroutineScope
。
val scope = MainScope()
scope.launch {
print("Hello World!")
}
使用这种方式创建的协程,可以在必要的时候取消。
scope.cancel()
这在一些场合十分有用。例如,在Activity
销毁的时候取消所有的网络请求。
3.2.4、小结:创建协程的关键参数
我们以launch
为例,看一下协程启动都有那些参数。下面给出了launch
函数的定义。
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job
可以看到,创建一个协程总共有5个关键的参数,分别是:
-
CoroutineScope
协程作用域 -
CoroutineContext
协程的上下文 -
CoroutineStart
协程的启动模式 -
block
协程体 -
Job
作业
协程体没什么好说的,就像你调用Handle().post(Runnable{ })
时传入的Runnable
对象一样,是你希望交给协程去执行的东西。
下面的篇幅,我们介绍剩下的4个参数。为了便于理解,介绍的顺序会略有调整。
3.3、协程启动模式
协程的启动模式定义在CoroutineStart
中,是一个枚举类:
public enum class CoroutineStart {
DEFAULT, // 立即执行协程体
LAZY, // 只有在有必要的情况下才执行协程体
@ExperimentalCoroutinesApi
ATOMIC, // 立即执行协程体,但在开始运行之前无法取消
@ExperimentalCoroutinesApi
UNDISPATCHED; // 立即在当前线程执行协程体,直到第一个suspend调用
}
DEFAULT
和LAZY
是我们最常用的模式。LAZY
只有在有必要的情况下才执行协程体。可以看一下下面这个例子:
val deferred = async(start = CoroutineStart.LAZY) {
print("coroutine run")
"Hello World"
}
print(deferred.await()) // 1⃣️
例如,上面的代码,我们设置了启动模式为LAZY
。因此,直到y1⃣️处的代码被调用,async
所创建的协程才会开始执行。
ATOMIC
和UNDISPATCHED
还是实验性质的API,这里暂不介绍了。
3.4、Job
Job
是launch
方法的返回值,通常的理解是作业。Job
表示了一个协程的工作任务。Job
是一个接口,我们看一下它的定义:
public interface Job : CoroutineContext.Element
而这个CoroutineContext.Element
又继承自CoroutineContext
。也就是说,Job
本质上还是个CoroutineContext
。
Job
是用来干嘛的?看一下Job
的函数就明白了。
public val isActive: Boolean
public val isCompleted: Boolean
public val isCancelled: Boolean
public fun start(): Boolean
public fun cancel(cause: CancellationException? = null)
public suspend fun join()
3.4.1、launch
与async
前面我们举了个例子,使用launch
创建并启动了一个协程。
launch
定义在Builders
中。事实上,在Builders
中,除了launch
,还有另一个常用的创建协程的方法async
。
public fun CoroutineScope.launch(...): Job
public fun <T> CoroutineScope.async(...): Deferred<T> {
单看这两个函数的定义,除了返回值不同也没有什么太大的区别了。
这个Deferred
本质上就是继承自Job
,最重要的一点就是多了一个await
的函数。故名思义,我们可以通过这个函数来获取协程执行结果。
配合上文中提到的CoroutineStart.LAZY
,就可以实现只有在await
被调用的时候才能才会开始执行的异步任务。
launch
通常用于那些不关心结果的耗时任务,而async
通常用于那些需要返回值的耗时任务,如网络请求、读取数据库等。
3.5、CoroutineScope
CoroutineScope
表示一个协程的作用域。CoroutineScope
是一个接口,定义如下:
public interface CoroutineScope {
public val coroutineContext: CoroutineContext
}
可以看到,CoroutineScope
的定义非常简单。每个CoroutineScope
都持有CoroutineContext
对象。
CoroutineScope
的一个作用就是在一些场合方便地取消所有已启动的协程。
在创建协程的时候,我们可以在父协程内创建子协程。这个时候,父协程会限制子协程的生命周期, 子协程则继承父协程的上下文。
GlobalScope
就是CoroutineScope
的一个子类。
作用域还和异常的传播有关,这个我们放到后面再说。
3.6、CoroutineContext
协程创建过程中,接收CoroutineContext
对象作为协程的上下文。CoroutineContext
本质上是一个接口,它有很多的实现。协程上下文是协程创建过程中很重要的一个参数。
在实际使用过程中,可以创建多个上下文,并使用+
操作符将其连在一起。相同类型的Context
,右边覆盖左边的。
Dispatchers.Main + CoroutineName("Get AccountInfo Coroutine")
如果把CoroutineContext
的接口和List
进行对比,你会觉得两者出奇地相似!
public operator fun plus(context: CoroutineContext): CoroutineContext
public fun minusKey(key: Key<*>): CoroutineContext
public operator fun <E : Element> get(key: Key<E>): E?
CoroutineContext
可以说就是一个以Key
为索引的List
。所以,本质上,CoroutineContext
就是一个数据结构而已。我们也可以通过Key
来查找某个具体的CoroutineContext
,如:
coroutineContext[CoroutineName]
接下来介绍一些常见的CoroutineContext
的实现。
3.6.1、CoroutineName
CoroutineName
的作用是为协程命名,类似于java中为线程添加名称:
new Thread().setName("Get AccountInfo Thread");
在协程中,通过下面的代码为创建的协程添加名称:
launch(CoroutineName("Get AccountInfo Coroutine")) {
}
2.6.2、Dispatchers
调度器
Dispatchers
中有4个默认的调度器可供使用,他们都继承自CoroutineDispatcher
:
-
Dispatchers.Unconfined
不指定线程, 如果子协程切换线程那么接下来的代码也运行在该线程上 -
Dispatchers.IO
适用于IO读写,底层用线程池实现 -
Dispatchers.Main
根据平台不同而有所差, Android平台上的实现为HandlerContext
(这里也说明了,在Android平台上,把一个协程绑定到主线程上执行,最终还是回到了Android平台的Handler
的那一套) -
Dispatchers.Default
默认调度器, 在线程池中执行协程体, 适用于计算操作
3.7、异常处理
3.7.1、全局的异常处理
无论是线程还是RxJava,都有默认的异常处理器。例如,我们可以为线程设置一个默认的异常处理:
Thread.setDefaultUncaughtExceptionHandler { t: Thread, e: Throwable ->
println("Thread '${t.name}' throws an exception with message '${e.message}'")
}
我们也可以为RxJava设置默认的异常处理器:
RxJavaPlugins.setErrorHandler(e -> {
println("Throws an exception with message '${e.message}'")
});
同样,协程也可以添加默认的异常处理器。
// 创建异常处理器,本质上仍然是 coroutineContext
val exceptionHandler = CoroutineExceptionHandler { coroutineContext, throwable ->
print("Throws an exception with message: ${throwable.message}")
}
MainScope().launch(exceptionHandler) {
throw IllegalArgumentException("wrong argument")
}
上面这段程序在运行的时候并不会崩溃,抛出的IllegalArgumentException
的异常被我们自定义的异常处理器捕捉到了。
3.7.2、async
的await
与join
上面我们以launch
为例介绍了CoroutineExceptionHandler
。但是值得注意的是,使用async
时,我们发现异常默默地消失了。即使没有定义异常处理器,程序也不会崩溃。
MainScope().async {
throw IllegalArgumentException("wrong argument")
}
这是因为async
的设计思路与launch
不同。对async
来说,异常只有在调用await
的时候才会消费。这也很好理解嘛,当我期望获取async
的结果时,程序已经发生了异常,没有办法给出正确的结果。此时,只好抛出一个异常。
但是另一个方法join
则不一样了。join
只关心协程是否执行完成,但并不关心是异常结束还是正常结束。即使我们用launch
替代async
,结果也是一样的。在join
的调用处并不会抛出异常。
总结一下:
- 对于
async
返回的deferred
来说,它有两个方法。当发生异常时,join
并不关心是否发生异常,只关心协程是否结束。而await
则会在调用处抛出异常。 -
launch
中未捕获的异常与async
的处理方式不同,launch
会直接抛出给父协程,如果没有父协程或父协程不响应(如supervisorScope
,下面会介绍),那么就交给上下文中指定的CoroutineExceptionHandler
处理,如果没有指定,那传给全局的CoroutineExceptionHandler
等等,而async
则要等await
来消费。 - 不管是哪个启动器,在应用了作用域之后,都会按照作用域的语义进行异常扩散,进而触发相应的取消操作,对于
async
来说就算不调用await
来获取这个异常,它也会在coroutineScope
当中触发父协程的取消逻辑。
看下面这个例子:
MainScope().launch { // 1⃣️
val deferred = async(exceptionHandler) {
throw IllegalArgumentException("wrong arg") // 2⃣️
}
try {
// 尽管这里捕获了异常,但是在2⃣️处抛出的异常仍然会扩散到1⃣️处,导致崩溃
deferred.await()
} catch (e : IllegalArgumentException) {
print("catch exception: ${e.message}")
}
}
3.7.3、作用域
异常的传播还和协程的作用域有关。
在前面我们已经看到,子协程出现异常会导致父协程同时被取消。那么有没有什么办法,把错误限制在子协程中呢?
答案就是使用supervisorScope
。
-
coroutineScope
内部的取消操作是双向传播的,子协程未捕获的异常也会向上传递给父协程。如果一个子协程异常退出,那么父协程也会退出。同样,父协程的异常也会导致所有子协程异常终止。同时,这也是协程内部再启动子协程的默认作用域。 -
supervisorScope
内部的取消操作是单向传播的,父协程向子协程传播,子协程的错误不会传播给父协程和它的兄弟协程。它更适合一些独立不相干的任务。
同样,用几个例子来说明一下:
我们先定义一个错误处理器:
val exceptionHandler = CoroutineExceptionHandler { coroutineContext, throwable ->
print("Throws an exception, ${coroutineContext[CoroutineName]}, message: ${throwable.message}")
}
接着,在coroutineScope
作用域中启动一个协程:
MainScope().launch(exceptionHandler + CoroutineName("coroutine 1")) { // 1⃣️
coroutineScope {
launch(exceptionHandler + CoroutineName("coroutine 2")) { // 2⃣️
throw IllegalArgumentException("oops!")
}
}
}
最终的结果是:
Throws an exception, CoroutineName(coroutine 1), message: oops!
如果把上面这个例子中的coroutineScope
换成supervisorScope
,那么结果就变成了:
Throws an exception, CoroutineName(coroutine 2), message: oops!
在上面这个🌰中,如果我们使用supervisorScope
,协程2⃣️发生的错误首先尝试向上传播到1⃣️,但是因为作用域的限制,父协程不响应异常处理,因此最终交给2⃣️处的异常处理器处理。
但是,需要注意的是,supervisorScope
只作用域其直接子协程。也就是说,在supervisorScope
的子协程中再创建的子协程,遵守默认的作用域,也就是coroutineScope
。
MainScope().launch(exceptionHandler + CoroutineName("coroutine 1")) { // 1⃣️
supervisorScope {
launch(exceptionHandler + CoroutineName("coroutine 2")) { // 2⃣️
launch(exceptionHandler + CoroutineName("coroutine 3")) { // 3⃣️
throw IllegalArgumentException("oops!")
}
}
}
}
这段代码运行的结果是:
Throws an exception, CoroutineName(coroutine 2), message: oops!
这里的协程3⃣️是supervisorScope
的子协程的子协程,因此,在3⃣️处发生的异常仍然会向上传播,最终被2⃣️捕捉到。
3.8、互斥
协程本质上仍是在线程上运行的。既然是在线程上,那么一定会有同步的问题。常见的解决线程之间同步问题的工具有:
-
synchronized
关键字 -
ReentrantLock
等java.util.concurrent.locks包中的锁 -
AtomicInteger
等java.util.concurrent.atomic包中的原子类 -
ConcurrentHashMap
等线程安全的集合
除此之外,kotlin还为我们提供了两个工具:
3.8.1、Mutex
synchronized
等关键字,在获取不到锁的时候会阻塞线程。而Mutex
通过挂起函数,在没有获取锁就挂起协程,获取后再恢复协程,协程挂起时线程并没有阻塞就可以执行其他逻辑。
mutex.withLock {
counter++
}
3.8.2、ThreadLocal
Java提供了ThreadLocal
用来保存线程局部数据,每个线程中的值都是单独的。协程中同样可以通过.asContextElement
实现协程版的局部数据。
val threadLocal = ThreadLocal<String>()
GlobalScope.launch(threadLocal.asContextElement("initial value")) {
print("thread: ${Thread.currentThread()} threadLocal: ${threadLocal.get()}")
yield()
print("thread: ${Thread.currentThread()} threadLocal: ${threadLocal.get()}")
}
结果如下:
thread: Thread[DefaultDispatcher-worker-1,5,main] threadLocal: initial value
thread: Thread[DefaultDispatcher-worker-3,5,main] threadLocal: initial value
可以看到,虽然线程切换了,但是从threadLocad中获取的值并没有发生改变。
3.9、Android最佳实践
Kotlin官网上给出了一个Android的最佳实践的例子,我们可以定义一个抽象的Activity
,通过重写onDestroy
方法,在退出的时候取消所有的协程,避免内存泄漏:
abstract class ScopedActivity: Activity(), CoroutineScope by MainScope(){
override fun onDestroy() {
super.onDestroy()
cancel()
}
}
想要启动一个协程,我们直接调用launch
方法就好。
launch {
// do something you want
}
4、协程源码解析
4.1、suspend
修饰符
suspend
是一个修饰符,含义是挂起,可以用在任何函数上。在开篇我们举例的的时候曾提到过:
suspend fun backupLazy() = {
// here for you to get LazyInfo
}
然而实际上,suspend
修饰符并不能实现挂起的操作。这个关键字,并不会帮助我们切换线程。它仅仅是提示函数的调用者,这是一个耗时的函数,因此需要放到协程中执行,仅此而已。
4.2、Continuation
续体
我们来看一下kotlin中协程的接口是怎么写的:
public interface Continuation<in T> {
public fun resumeWith(result: Result<T>)
}
这个resumeWith
可能还不是很明显,但是如果看到下面两个扩展函数:
public inline fun <T> Continuation<T>.resume(value: T): Unit =
resumeWith(Result.success(value))
public inline fun <T> Continuation<T>.resumeWithException(exception: Throwable): Unit =
resumeWith(Result.failure(exception))
与我们常见的callback
接口的定义对比一下:
public interface Callback<T> {
void onFail(int code, String message);
void onSuc(T response);
}
是不是感觉一模一样!onSuc
和onFail
分别于resumeWith
和resumeWIthException
相互对应。所以,有些人说协程本质上是回调,确实也是这么一回事。
假设我们在主线程创建了一个协程并绑定到IO线程,那么在协程启动后,这个协程就从主线程上剥离了。主线程会继续执行剩下的代码,而与IO线程绑定的这个协程执行完后,就通过这个resumeWith
的方法,重新绑定到主线程上执行。
在协程官方文档上,把这个Continuation
称之为续体。在编译的过程中,一个完整的协程被分割切块成一个又一个续体。每一次挂起之后,都会对应着一次resumeWith
恢复。
4.3、CPS 续体传递风格
CPS的全称是Continuation-Passing-Style,翻译为续体传递风格,这是个有点抽象的概念。
我们先看一下挂起的函数声明:
suspend fun <T> CompletableFuture<T>.await(): T
在经过所谓的CPS变换后,它的函数签名变成了这样:
fun <T> CompletableFuture<T>.await(continuation: Continuation<T>): Any?
这种变换,就成为CPS变换。主要的变化有两点:
- 续体作为参数传入
await
方法 - 返回值变成了Any?
其中,第二点变化是因为,CPS变换后,这个函数除了要返回它本身的返回值,还要返回一个标记——COROUTINE_SUSPENDED
(我们会在下文看到),这个标记返回时,表示表示这个挂起函数会发生事实上的挂起操作。
什么叫事实上的挂起操作?我们举个例子:
val deferred = async { // 1⃣️
// do something
}
delay(1000)
deferred.await() // 2⃣️
上述代码执行到标记为2⃣️的地方,是否挂起取决于协程(1⃣️处创建)是否执行完成。如果已经执行完成了,那么2⃣️直接把结果拿过来用就行。如果没有完成,那么2⃣️就会发生事实上的挂起,等待协程执行完毕。
4.4、ContinuationInterceptor
续体拦截器
上面已经介绍了协程续体和CPS变换。我们创建的协程,在每一个挂起点,都对应着一次resumeWith
的操作。那协程又是怎么绑定到线程上执行的?
这就涉及到了ContinuationInterceptor
。
ContinuationInterceptor
是拦截器,拦截器可以做的事情有很多,切换线程只是它可以实现的目的之一。我们也可以实现自己的拦截器,用来打印一些日志。
照例先看一下ContinuationInterceptor
的定义:
public interface ContinuationInterceptor : CoroutineContext.Element {
public fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T>
public fun releaseInterceptedContinuation(continuation: Continuation<*>)
}
我列举了两个最重要的函数。其中,interceptContinuation
会在ContinuationImpl
中被调用,从而实现拦截的目的。
val intercepted = continuation.context[ContinuationInterceptor]?.interceptContinuation(continuation) ?: continuation
4.4.1、HandlerContext
还记得前文提到的Dispatchers.Main
吗?在Android平台,它的实现就是HandlerContext
。HandlerContext
也是继承自ContinuationInterceptor
。这里,用HandlerContext
来举个例子,说明在Android平台,协程是怎么切换到主线程中执行的。
大段的代码就不列举了,我们精简一下,只看它的一个函数:
override fun dispatch(context: CoroutineContext, block: Runnable) {
handler.post(block)
}
好嘛!万变不离其宗,最终还是回到了我们熟悉的Handler
。
这也说明了,协程本质上只是编译器的一层封装。它底层所依赖的,仍旧是Java的线程池(例如Dispatchers.IO
),或是平台的一些特性(如Handler
)。
4.5、状态机
出于性能的考虑,协程在编译挂起函数时会将函数体编译为状态机,这样可以避免创建过多的类和对象。
我们看一个例子:
val a = a()
val y = foo(a).await() // 挂起点 #1
b()
val z = bar(a, y).await() // 挂起点 #2
c(z)
上面的代码有2个挂起点。它在编译为java的字节码后,大致是这样的:
class <anonymous_for_state_machine> extends SuspendLambda<...> {
// 状态机当前状态
int label = 0
// 协程的局部变量
A a = null
Y y = null
void resumeWith(Object result) {
if (label == 0) goto L0
if (label == 1) goto L1
if (label == 2) goto L2
else throw IllegalStateException()
L0:
// 这次调用,result 应该为空
a = a()
label = 1
result = foo(a).await(this) // 'this' 作为续体传递
if (result == COROUTINE_SUSPENDED) return // 如果 await 挂起了执行则返回
L1:
// 外部代码传入 .await() 的结果恢复协程
y = (Y) result
b()
label = 2
result = bar(a, y).await(this) // 'this' 作为续体传递
if (result == COROUTINE_SUSPENDED) return // 如果 await 挂起了执行则返回
L2:
// 外部代码传入 .await() 的结果恢复协程
Z z = (Z) result
c(z)
label = -1 // 没有其他步骤了
return
}
}
注1:上面的代码是伪代码。
注2:以上代码摘自Kotlin的官方设计文档
一个挂起函数会被编译成一个匿名类,这个匿名类中的resumeWith
函数实现了状态机。成员变量label
代表了当前状态机的状态,每一个续体(即挂起点中间的部分以及挂起点与函数头尾之间的部分)都各自对应了一个状态,当函数运行到每个挂起点时,label
的值都受限会发生改变,并且当前的续体(也就是代码中的this)都会作为实参传递给发生了CPS变换的挂起函数,如果这个挂起函数没有发生事实上的挂起,函数继续运行,如果发生了事实上的挂起,则函数直接return
。
由于label
记录了状态,所以,在协程恢复的时候,可以根据状态使用goto
语句直接跳转至上次的挂起点并向后执行,这就是协程挂起的原理。
顺便提一句,虽然Java中没有goto
语句,但是class字节码中支持goto
。
5、协程高级应用
Kotlin中的协程还提供了Channel
、Flow
等API可供调用。
Channel
与Select
的结合可以实现协程版的NIO,Flow
则是Kotlin协程与响应式编程模型结合的产物,同样分为发射数据的上游、接收数据的下游以及连接上下游的操作符三个部分。
6、总结
通过上面的源码分析,我们可以发现,协程是基于线程实现的一层更加上层的API,这个API能够帮助我们用同步的方式写出异步执行的代码。
除此之外,它好像也没有什么特别神秘的东西了。本质上,协程仍旧是基于线程的,它并不是什么空中楼阁,可以凭空存在。
6.1、协程真的更加高效吗
还剩下的一个问题是,协程真的更加高效吗?
在Kotlin的官方文档上,有这样一个例子,创建了10w个协程与10w个线程进行对比。此时,协程并没有什么问题,而线程则直接发生了OOM。
repeat(100_000) {
launch {
delay(1000L)
print(".")
}
}
repeat(100_000) {
thread {
Thread.sleep(1000L)
print(".")
}
}
然而,通过前面的分析,我们已经明白,协程本质上是一层更上的API而已。如果考虑API的开销,协程可能会比直接使用原生的Handler
或者Java
的线程池更慢,并不存在性能上的优势。
官网的例子的误导性在于,10w个协程最终都是绑定到线程池上的,并不是真正创建了10w个线程。如果不是创建10w个线程而是使用线程池,那么就不会发生OOM了。
使用下面两个测试程序,在小米MIX 2进行测试,使用JAVA 线程池的API的性能是协程的2~3倍。这部分的开销。可见,协程对比原生的线程池,并没有什么性能上的优势。
private const val REPEATE_TIMES = 100_000
private const val DELAY_TIME = 1000L
// 3800ms
private fun coroutinesTest() {
val startTime = System.currentTimeMillis()
val count = AtomicInteger()
val dispatcher = Executors.newSingleThreadScheduledExecutor().asCoroutineDispatcher()
GlobalScope.launch {
repeat(REPEATE_TIMES) {
launch(dispatcher) {
delay(DELAY_TIME)
doIf(count.incrementAndGet() == REPEATE_TIMES) {
val endTime = System.currentTimeMillis()
print(endTime - startTime)
}
}
}
}
}
// 1400ms
private fun threadTest() {
val startTime = System.currentTimeMillis()
val count = AtomicInteger()
val executors = Executors.newSingleThreadScheduledExecutor()
repeat(REPEATE_TIMES) { executors.schedule({
doIf(count.incrementAndGet() == REPEATE_TIMES) {
val endTime = System.currentTimeMillis()
print(endTime - startTime)
}
}, DELAY_TIME, TimeUnit.MICROSECONDS) }
}