1. 引言
前文提及了join
函数,那么进一步的便是协程中非常强大的结构化并发设计了。
结构化并发(structured concurrency)从字面上并不直观能理解,本文将通过实践代码来介绍。
2. 巧妙地等待协程完成
前面有所介绍,希望等待某一个协程执行完成时,可以使用Job#join()
,那么自然而言地,如果需要等个多个协程,那么就将每次启动的协程Job对象收集或保存起来再逐个调用即可。
嗯,这样的确可以在功能上达到目的。但是,想一下,在协程取消刚开始时我们是从Job维度去实现取消的,这当然可以实现取消的需要,但是在进行到协程作用域部分的实践时,便是不再通过Job的维度去取消,而是通过协程作用域的维度。那么,可以想想,在等待协程完成中是否也有这种功能呢?
事实上,却是可以有更好的选择,那便是结构化并发。
上代码前,假定一个场景吧:某一时刻要进行三个耗时的操作,三个操作之间互不干扰,三个操作都执行完成后再进行下一步执行。
这里假定的耗时操作实践代码如下:
private suspend fun testIOCoroutine(calledMsg: String) {
"Coroutine IO runs ($calledMsg)".let {
myLog(it)
}
/* 仅为样例代码,休眠线程其实是非常非常不建议的做法!! */
Thread.sleep(randomMilli)
"Coroutine IO runs after thread sleep ($calledMsg)".let {
myLog(it)
}
}
本质上是以将当前线程休眠一个随机的时间,为了使时间代码更有趣些,每次调用时休眠的时间这里其实是5000毫秒到10000毫秒之间的随机值:
val randomMilli: Long
get() = (FIVE_SECONDS..TEN_SECONDS).random()
接下来按照假定的场景,分别启动三个协程进行三次耗时操作调用,并且有相应的log输出代码:
private fun launchStructuredBtnClicked() {
"launchStructuredBtnClicked".let {
myLog(it)
}
job?.cancel()
job = scope.launch {
"Coroutine Main runs (launchStructuredBtnClicked)".let {
myLog(it)
}
launch(Dispatchers.IO) {
testIOCoroutine("launchStructuredBtnClicked A")
}
launch(Dispatchers.IO) {
testIOCoroutine("launchStructuredBtnClicked B")
}
launch(Dispatchers.IO) {
testIOCoroutine("launchStructuredBtnClicked C")
}
"Coroutine Main runs final statement (launchStructuredBtnClicked)".let {
myLog(it)
}
}
}
这里先保留个问题:明明只需要3个协程去执行代码,为什么这里启动了4个协程?
可以看到,只有最外面scope.launch
启动的协程Job对象被记录下来了,里面启动的3个协程Job对象并没有被记录。
最后,还是等待协程完成,同样也是Job#join()的调用:
private fun joinBtnClicked() {
"joinBtnClicked".let {
myLog(it)
}
scope.launch(Dispatchers.Main) {
"Coroutine Main runs (joinBtnClicked)".let {
myLog(it)
}
val jobNonNull = job ?: return@launch
jobNonNull.join()
"Coroutine Main runs after join() (joinBtnClicked)".let {
myLog(it)
}
}
}
先点击launchStructuredBtn再点击joinBtn的话,某一次执行的结果log输出如下:
20:35:26.665 D/chenhj: launchStructuredBtnClicked ::running in Thread:[id:2][name:main]
20:35:26.687 D/chenhj: Coroutine Main runs (launchStructuredBtnClicked) ::running in Thread:[id:2][name:main]
20:35:26.688 D/chenhj: Coroutine IO runs (launchStructuredBtnClicked A) ::running in Thread:[id:3037][name:DefaultDispatcher-worker-3]
20:35:26.688 D/chenhj: Coroutine IO runs (launchStructuredBtnClicked B) ::running in Thread:[id:3036][name:DefaultDispatcher-worker-2]
20:35:26.688 D/chenhj: Coroutine Main runs final statement (launchStructuredBtnClicked) ::running in Thread:[id:2][name:main]
20:35:26.688 D/chenhj: Coroutine IO runs (launchStructuredBtnClicked C) ::running in Thread:[id:3039][name:DefaultDispatcher-worker-5]
20:35:27.843 D/chenhj: joinBtnClicked ::running in Thread:[id:2][name:main]
20:35:27.847 D/chenhj: Coroutine Main runs (joinBtnClicked) ::running in Thread:[id:2][name:main]
20:35:32.726 D/chenhj: Coroutine IO runs after thread sleep (launchStructuredBtnClicked C) ::running in Thread:[id:3039][name:DefaultDispatcher-worker-5]
20:35:33.684 D/chenhj: Coroutine IO runs after thread sleep (launchStructuredBtnClicked A) ::running in Thread:[id:3037][name:DefaultDispatcher-worker-3]
20:35:36.475 D/chenhj: Coroutine IO runs after thread sleep (launchStructuredBtnClicked B) ::running in Thread:[id:3036][name:DefaultDispatcher-worker-2]
20:35:36.478 D/chenhj: Coroutine Main runs after join() (joinBtnClicked) ::running in Thread:[id:2][name:main]
通过日志可以证明关键点,虽然只等待一个Job对象完成,事实上,也会等待到里面A/B/C三个协程完成。
事实上,可以将所被记录下来的Job对象,其实为父协程,里面所启动的A/B/C三个协程则为子协程。
协程完成的条件,概述为一下:
- 当其没有子协程时,完成状态的条件是自身执行结束;
- 当其有一个或以上的子协程时,完成状态的条件是 自身执行结束 且 所有的子协程都处于完成状态;
以上例子可以证明,变量job的最后一行代码早早已经执行结束,但是调用join()后的log,必然会等待到A/B/C三个子协程都执行结束输出最后一行log后才会输出。
事实上,join
函数获得恢复的时间点,是变量job协程执行完成时间点、协程A执行完成时间点、协程B执行完成时间点、协程C完成时间点,这四个时间点中最晚的时间点。
这里,本质上便是使用了协程结构化并发,只要把握住了父协程去等待,那么父协程的完成会结构化地等待子协程完成,这样,管理维度将不会是一个个独立的Job对象,而是利用了结 构化的关系简化对Job对象的使用。
3. 进阶版等待多协程完成
这里其实也还是麻烦,因为还是要保存一个Job对象来调用join函数。事实上,还真有进一步的优化使用。
接下来的主角便是挂起函数supervisorScope
,上代码:
private fun supervisorScopeBtnClicked() {
"supervisorScopeBtnClicked".let {
myLog(it)
}
job?.cancel()
job = scope.launch {
"Coroutine Main runs (supervisorScopeBtnClicked)".let {
myLog(it)
}
supervisorScope {
"supervisorScope lambda runs (supervisorScopeBtnClicked)".let {
myLog(it)
}
launch(Dispatchers.IO) {
testIOCoroutine("supervisorScopeBtnClicked A")
}
launch(Dispatchers.IO) {
testIOCoroutine("supervisorScopeBtnClicked B")
}
launch(Dispatchers.IO) {
testIOCoroutine("supervisorScopeBtnClicked C")
}
"supervisorScope lambda runs final statement (supervisorScopeBtnClicked)".let {
myLog(it)
}
}
"Coroutine Main runs final statement (supervisorScopeBtnClicked)".let {
myLog(it)
}
}
}
好像,这里不也还是保存了启动的Job对象?先别急,一步步来,现在的关键是,只点击supervisorScopeBtn而不去点击joinBtn,产生的log如下:
21:16:08.578 D/chenhj: supervisorScopeBtnClicked ::running in Thread:[id:2][name:main]
21:16:08.625 D/chenhj: Coroutine Main runs (supervisorScopeBtnClicked) ::running in Thread:[id:2][name:main]
21:16:08.628 D/chenhj: supervisorScope lambda runs (supervisorScopeBtnClicked) ::running in Thread:[id:2][name:main]
21:16:08.629 D/chenhj: Coroutine IO runs (supervisorScopeBtnClicked A) ::running in Thread:[id:3036][name:DefaultDispatcher-worker-2]
21:16:08.629 D/chenhj: Coroutine IO runs (supervisorScopeBtnClicked B) ::running in Thread:[id:3037][name:DefaultDispatcher-worker-3]
21:16:08.630 D/chenhj: Coroutine IO runs (supervisorScopeBtnClicked C) ::running in Thread:[id:3035][name:DefaultDispatcher-worker-1]
21:16:08.631 D/chenhj: supervisorScope lambda runs final statement (supervisorScopeBtnClicked) ::running in Thread:[id:2][name:main]
21:16:16.711 D/chenhj: Coroutine IO runs after thread sleep (supervisorScopeBtnClicked C) ::running in Thread:[id:3035][name:DefaultDispatcher-worker-1]
21:16:16.826 D/chenhj: Coroutine IO runs after thread sleep (supervisorScopeBtnClicked B) ::running in Thread:[id:3037][name:DefaultDispatcher-worker-3]
21:16:16.945 D/chenhj: Coroutine IO runs after thread sleep (supervisorScopeBtnClicked A) ::running in Thread:[id:3036][name:DefaultDispatcher-worker-2]
21:16:17.050 D/chenhj: Coroutine Main runs final statement (supervisorScopeBtnClicked) ::running in Thread:[id:2][name:main]
这次的关键点是,在supervisorScope
函数后面的代码(log输出),会在A/B/C三个协程执行完成后才会执行!
挂起函数supervisorScope
很实用的一个点便是,会挂起当前协程直到其产生子协程作用域启动的所有协程均执行完成后再恢复当前协程。
说起来是有点绕,不妨自己拿前面的代码或者demo代码自行阅读体会一下。
这里已经不需要Job#join()了,直接在supervisorScope
后面执行的代码,就已经确保了子协程的执行完成!
其实吧,这里的代码的细节顺序还可以再理理,比如哪些是保证了顺序的,哪些是不保证顺序的。协程,毕竟是个异步开发的内容,所以代码执行逻辑与顺序,很重要!
同样的,这里也把启动的job保存了,所以在joinBtn中也可以进一步确认效果。
4. 样例工程代码
代码样例Demo,见Github:https://github.com/TeaCChen/CoroutineStudy
本文示例代码,如觉奇怪或啰嗦,其实为StructuredStepOneActivity.kt
中的代码摘取主要部分说明,在demo代码当中,为提升细节内容,有更加多的封装和输出内容。
本文的页面截图示例如下:
5. 补充说明
结构化并发,属于协程设计里的内容,本文的内容仅是初步地了解。
结构化并发这个功能是在设计上是强大的,比如前面的通过协程作用域去取消协程的方式其实也属于结构化并发的内容,本文只在等待协程完成的角度去引出结构化并发的内容,事实上,结构化并发的内容还有很多,比如异常处理的传递、调度器的传递等。
这里必须强调一个内容,协程之间产生父子关系的关键是协程作用域(更根本上说,是协程作用域中的协程上下文的Job对象),而并不是启动协程的层级!
协程的内容是丰富而强大的,学习和使用的过程中不要指望一蹴而就。
一学就会的协程使用——基础篇(七)初识结构化(本文)