注:本文中使用
runBlocking
是为了方便测试,业务开发中禁止使用
一、协程基础
1、创建协程的三种方式
(1) 使用 runBlocking
顶层函数(业务开发中不会用到这种方法,因为它是线程阻塞的,一般用于单元测试)。
val runBlock = runBlocking {}
(2) 使用 GlobalScope
单例对象创建(Android 开发中不推荐这种用法,因为它的生命周期只受整个应用程序的生命周期限制,且不能取消)。
val globalScopeLaunch = GlobalScope.launch {}
val globalScopeAsync = GlobalScope.async {}
(3) 自行通过 CoroutineScope
创建。
val coroutineScopeLaunch = CoroutineScope(SupervisorJob() + Dispatchers.Default).launch {}
val coroutineScopeAsync = CoroutineScope(SupervisorJob() + Dispatchers.Default).async {}
2、简单示例
fun testCoroutine() = runBlocking {
launch {
delay(1000)
println("hello")
delay(2000)
println("world")
}
println("test1")
println("test2")
// test1
// test2
// hello
// world
}
fun testCoroutine()2 = runBlocking {
val job = launch {
delay(1000)
println("hello")
delay(2000)
println("world")
}
println("test1")
job.join() // 显示地等待 job 执行结束
println("test2")
// test1
// hello
// world
// test2
}
3、取消协程
fun cancelCoroutine() = runBlocking {
val job = launch {
repeat(1000) {
println("job: test $it ...")
delay(500)
}
}
delay(1300)
println("main: ready to cancel !")
job.cancel() // 取消作业
job.join() // 等待作业执行结束
// job.cancelAndJoin()
println("main: Now cancel.")
// job: test 0 ...
// job: test 1 ...
// job: test 2 ...
// main: ready to cancel !
// main: Now cancel.
}
如果协程正在执行计算任务,并且没有检查取消的话,那么它是不能被取消的。
fun canNotCancelCoroutine() = runBlocking {
val startTime = System.currentTimeMillis()
val job = launch(Dispatchers.Default) {
var nextPrintTime = startTime
var i = 0
while (i < 5) {
if (System.currentTimeMillis() >= nextPrintTime) {
println("job: hello ${i++} ...")
nextPrintTime += 500
}
}
}
delay(1300)
println("main: ready to cancel !")
job.cancelAndJoin()
println("main: now canceled.")
// job: hello 0 ...
// job: hello 1 ...
// job: hello 2 ...
// main: ready to cancel !
// job: hello 3 ...
// job: hello 4 ...
// main: now canceled.
}
加入状态判断 isActive
或者使用状态检查 ensureActive()
再或者使用 yield()
(内部会先检查状态)来确保取消协程。
fun ensureCancelCoroutine() = runBlocking {
val startTime = System.currentTimeMillis()
val job = launch(Dispatchers.Default) {
var nextPrintTime = startTime
var i = 0
while (i < 5 && isActive) {
// ensureActive()
// yield()
if (System.currentTimeMillis() >= nextPrintTime) {
println("job: hello ${i++} ...")
nextPrintTime += 500
}
}
}
delay(1300)
println("main: ready to cancel !")
job.cancelAndJoin()
println("main: now canceled.")
// job: hello 0 ...
// job: hello 1 ...
// job: hello 2 ...
// main: ready to cancel !
// main: now canceled.
}
4、等待协程执行结果
fun asyncCoroutine() = runBlocking {
val deferred = async {
delay(2000)
"async result"
}
val result = deferred.await()
println("deferred: $result")
// deferred: async result
}
5、异常处理
(1)try - catch - finally
fun catchExCoroutine() = runBlocking {
val job = launch {
try {
delay(200)
println("try...")
throw NullPointerException()
} catch (e: Exception) {
println("exception: ${e.message}")
} finally {
println("finally...")
}
}
delay(1000)
println("cancel")
job.cancel()
println("done")
// try...
// exception: null
// finally...
// cancel
// done
}
(2)CoroutineExceptionHandler
fun handleExCoroutine() = runBlocking {
val job = launch(CoroutineExceptionHandler { coroutineContext, throwable ->
println("exception: ${throwable.message}")
}) {
delay(200)
println("try...")
throw NullPointerException()
}
delay(1000)
println("cancel")
job.cancel()
println("done")
// try...
// Exception in thread "main" java.lang.NullPointerException
// at com.wf.kotlin.study.11协程._01_协程CoroutineKt$handleExCoroutine$1$job$2.invokeSuspend(01.协程Coroutine.kt:186)
// ......
}
6、协程的超时
fun timeoutCoroutine() = runBlocking {
withTimeout(300) {
println("start...")
delay(100)
println("progress 1...")
delay(100)
println("progress 2...")
delay(100)
println("progress 3...")
delay(100)
println("progress 4...")
delay(100)
println("progress 5...")
println("end")
}
// start...
// progress 1...
// progress 2...
// Exception in thread "main" kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 300 ms
// at kotlinx.coroutines.TimeoutKt.TimeoutCancellationException(Timeout.kt:186)
// ......
}
二、并发与挂起函数
1、使用 async 并发
fun asyncConcurrent() = runBlocking {
val time = measureTimeMillis {
printlnWithThread("start")
val a = async(Dispatchers.IO) {
printlnWithThread()
delay(1000)
1
}
val b = async(Dispatchers.IO) {
printlnWithThread()
delay(2000)
2
}
printlnWithThread("a + b = ${a.await() + b.await()}")
printlnWithThread("end")
}
printlnWithThread("time: $time")
// Wed Feb 22 14:36:07 CST 2023 Thread -> id: 1, name: main, start
// Wed Feb 22 14:36:07 CST 2023 Thread -> id: 13, name: DefaultDispatcher-worker-1,
// Wed Feb 22 14:36:07 CST 2023 Thread -> id: 15, name: DefaultDispatcher-worker-3,
// Wed Feb 22 14:36:09 CST 2023 Thread -> id: 1, name: main, a + b = 3
// Wed Feb 22 14:36:09 CST 2023 Thread -> id: 1, name: main, end
// Wed Feb 22 14:36:09 CST 2023 Thread -> id: 1, name: main, time: 2065
}
2、惰性启动 async
(1)通过将 start
参数设置为 CoroutineStart.LAZY
变成惰性的;
(2)在这个模式下,调用 await
获取协程执行结果或者调用 Job
的 start
方法时,协程才会启动。
fun lazyAsyncConcurrent() = runBlocking {
val time = measureTimeMillis {
printlnWithThread("enter")
val a = async(Dispatchers.IO, CoroutineStart.LAZY) {
printlnWithThread()
delay(1000)
1
}
val b = async(Dispatchers.IO, CoroutineStart.LAZY) {
printlnWithThread()
delay(2000)
2
}
delay(1000)
printlnWithThread("start")
a.start()
b.start()
printlnWithThread("a + b = ${a.await() + b.await()}")
printlnWithThread("end")
}
printlnWithThread("time: $time")
// Wed Feb 22 14:24:32 CST 2023 Thread -> id: 1, name: main, enter
// Wed Feb 22 14:24:33 CST 2023 Thread -> id: 1, name: main, start
// Wed Feb 22 14:24:33 CST 2023 Thread -> id: 13, name: DefaultDispatcher-worker-1,
// Wed Feb 22 14:24:33 CST 2023 Thread -> id: 15, name: DefaultDispatcher-worker-3,
// Wed Feb 22 14:24:35 CST 2023 Thread -> id: 1, name: main, a + b = 3
// Wed Feb 22 14:24:35 CST 2023 Thread -> id: 1, name: main, end
// Wed Feb 22 14:24:35 CST 2023 Thread -> id: 1, name: main, time: 3071
}
3、挂起函数
- 定义:
(1)使用suspend
关键字修饰的函数成为挂起函数;
(2)挂起函数只能在另一个挂起函数,或者协程中被调用;
(3)在挂起函数中可以调用普通函数(非挂起函数)。 - 实现:实现挂起的的目的是让程序脱离当前的线程,也就是要 切线程
(1)给函数加上suspend
关键字;
(2)如果是耗时操作,则使用withContext(Dispatchers.IO)
切换线程去执行耗时操作;
(3)如果是延时操作,则调用delay(100)
函数即可。
suspend fun callA(): Int {
printlnWithThread() // 调用普通函数
delay(1000) // 调用挂起函数
return 1
}
fun suspendFunConcurrent() = runBlocking {
val time = measureTimeMillis {
printlnWithThread("start")
val a = async(Dispatchers.IO) {
callA()
}
val b = async(Dispatchers.IO) {
printlnWithThread()
delay(2000)
2
}
printlnWithThread("a + b = ${a.await() + b.await()}")
printlnWithThread("end")
}
printlnWithThread("time: $time")
// Wed Feb 22 14:33:58 CST 2023 Thread -> id: 1, name: main, start
// Wed Feb 22 14:33:58 CST 2023 Thread -> id: 13, name: DefaultDispatcher-worker-1,
// Wed Feb 22 14:33:58 CST 2023 Thread -> id: 15, name: DefaultDispatcher-worker-3,
// Wed Feb 22 14:34:00 CST 2023 Thread -> id: 1, name: main, a + b = 3
// Wed Feb 22 14:34:00 CST 2023 Thread -> id: 1, name: main, end
// Wed Feb 22 14:34:00 CST 2023 Thread -> id: 1, name: main, time: 2074
}
三、协程上下文和作用域
1、协程上下文 CoroutineContext
(1)协程上下文包含当前协程 scope
的信息, 比如 Job
, Dispatcher
, ContinuationInterceptor
, CoroutineName
和 CoroutineId
;
(2)在 CoroutineContext
中,是用 map
来存这些信息的, map
的键是这些类的 伴生对象,值是这些类的一个实例。
fun jobCoroutineContext() = runBlocking {
val job = launch(Dispatchers.Default + CoroutineName("test")) {
printlnWithThread("job: ${this.coroutineContext[Job]}, ${this.coroutineContext[CoroutineName]}")
}
printlnWithThread("job: $job")
printlnWithThread("job: ${job[Job]}")
// Wed Feb 22 15:29:56 CST 2023 Thread -> id: 1, name: main, job: StandaloneCoroutine{Active}@7d0587f1
// Wed Feb 22 15:29:56 CST 2023 Thread -> id: 13, name: DefaultDispatcher-worker-1, job: StandaloneCoroutine{Active}@7d0587f1, CoroutineName(test)
// Wed Feb 22 15:29:56 CST 2023 Thread -> id: 1, name: main, job: StandaloneCoroutine{Active}@7d0587f1
}
2、协程作用域 CoroutineScope
(1)CoroutineScope
的代码很简单,主要作用是提供 CoroutineContext
, 启动协程需要 CoroutineContext
;
(2)作用域可以管理其域内的所有协程,一个 CoroutineScope
可以有许多的子 scope
;
(3)协程内部是通过 CoroutineScope.coroutineContext
自动继承自父协程的上下文;
(4)一个父协程总是等待所有的子协程执行结束,取消父协程会取消所有的子协程;
(5)默认情况下,协程内,某个子协程抛出一个非 CancellationException
异常,未被捕获,会传递到父协程;
(6)任何一个子协程异常退出,那么整体都将退出。
fun myCoroutineScope() = runBlocking {
val dispatcher = Executors.newFixedThreadPool(1).asCoroutineDispatcher()
val myScope = CoroutineScope(dispatcher)
myScope.launch {
printlnWithThread()
}
// Wed Feb 22 15:30:48 CST 2023 Thread -> id: 13, name: pool-1-thread-1,
}
3、SupervisorJob
(1)SupervisorJob 与 Job 基本类似,区别在于父协程和兄弟协程不会被此子协程的异常和取消所影响;
(2)适合一些独立不相干的任务,任何一个任务出问题,并不会影响其他任务的工作。
fun supervisorJob() = runBlocking(CoroutineExceptionHandler { _, throwable ->
printlnWithThread(throwable.message ?: "throwable")
}) {
launch(Dispatchers.Default + SupervisorJob()) {
printlnWithThread("child job")
throw NullPointerException()
}
launch {
delay(1000)
printlnWithThread("brother job")
}
delay(3000)
printlnWithThread("end")
// Wed Feb 22 14:50:19 CST 2023 Thread -> id: 13, name: DefaultDispatcher-worker-1, child job
// Wed Feb 22 14:50:19 CST 2023 Thread -> id: 13, name: DefaultDispatcher-worker-1, ExceptionHandler
// Wed Feb 22 14:50:20 CST 2023 Thread -> id: 1, name: main, brother job
// Wed Feb 22 14:50:22 CST 2023 Thread -> id: 1, name: main, end
}
四、协程并发和同步
1、Volatile
(1)保证此变量对所有的线程的可见性(跳过 cpu cache 读取主存);
(2)禁止指令重排序优化;
(3)不会阻塞线程;
(4)不能保证原子性(即不++、--等非一次原子操作中不能同步)。
@Volatile
var flag1 = true
fun volatileFun() {
printlnWithThread("start")
Thread {
flag1 = false
printlnWithThread("flag1 = $flag1")
}.start()
while (flag1) {
printlnWithThread("flag1 = $flag1")
}
printlnWithThread("end")
// Wed Feb 22 15:08:52 CST 2023 Thread -> id: 1, name: main, start
// Wed Feb 22 15:08:52 CST 2023 Thread -> id: 1, name: main, flag1 = true
// Wed Feb 22 15:08:52 CST 2023 Thread -> id: 1, name: main, flag1 = true
// Wed Feb 22 15:08:52 CST 2023 Thread -> id: 1, name: main, flag1 = true
// Wed Feb 22 15:08:52 CST 2023 Thread -> id: 1, name: main, flag1 = true
// Wed Feb 22 15:08:52 CST 2023 Thread -> id: 13, name: Thread-0, flag1 = false
// Wed Feb 22 15:08:52 CST 2023 Thread -> id: 1, name: main, end
}
2、synchronized
var flag2 = true
fun synchronizedFun() {
printlnWithThread("start")
Thread {
flag2 = false
printlnWithThread("flag2 = $flag2")
}.start()
while (flag2) {
synchronized(flag2) {
printlnWithThread("flag2 = $flag2")
}
}
printlnWithThread("end")
// Wed Feb 22 15:08:10 CST 2023 Thread -> id: 1, name: main, start
// Wed Feb 22 15:08:10 CST 2023 Thread -> id: 1, name: main, flag2 = true
// Wed Feb 22 15:08:10 CST 2023 Thread -> id: 13, name: Thread-0, flag2 = false
// Wed Feb 22 15:08:10 CST 2023 Thread -> id: 1, name: main, end
}
3、使用线程安全的数据结构 Atomicxxxx、ReentrantLock等
var count = AtomicInteger()
fun atomicFun() = runBlocking(Dispatchers.IO) {
printlnWithThread("start")
repeat(100) {
launch {
repeat(1000) {
count.incrementAndGet()
}
}
}
launch {
delay(3000)
printlnWithThread("count = ${count.get()}")
printlnWithThread("end")
}
// Wed Feb 22 15:14:40 CST 2023 Thread -> id: 13, name: DefaultDispatcher-worker-1, start
// Wed Feb 22 15:14:43 CST 2023 Thread -> id: 24, name: DefaultDispatcher-worker-12, count = 100000
// Wed Feb 22 15:14:43 CST 2023 Thread -> id: 24, name: DefaultDispatcher-worker-12, end
}
var lock = ReentrantLock()
var count2 = 0
fun lockFun() = runBlocking(Dispatchers.IO) {
printlnWithThread("start")
repeat(100) {
launch {
repeat(1000) {
lock.lock()
try {
count2++
} finally {
lock.unlock()
}
}
}
}
launch {
delay(3000)
printlnWithThread("count = $count2")
printlnWithThread("end")
}
// Wed Feb 22 15:17:45 CST 2023 Thread -> id: 13, name: DefaultDispatcher-worker-1, start
// Wed Feb 22 15:17:48 CST 2023 Thread -> id: 30, name: DefaultDispatcher-worker-18, count = 100000
// Wed Feb 22 15:17:48 CST 2023 Thread -> id: 30, name: DefaultDispatcher-worker-18, end
}
4、协程专属锁 Mutex
(1)它具有 lock
和 unlock
方法,关键的区别在于, Mutex.lock()
是一个挂起函数,它不会阻塞当前线程;
(2)还有 withLock
扩展函数,可以方便的替代常用的 mutex.lock()
、try { …… } finally { mutex.unlock() }
模式。
var mutex = Mutex()
var count3 = 0
fun mutexFun() = runBlocking(Dispatchers.IO) {
printlnWithThread("start")
repeat(100) {
launch {
repeat(1000) {
mutex.withLock {
count3++
}
}
}
}
launch {
delay(3000)
printlnWithThread("count = $count3")
printlnWithThread("end")
}
// Wed Feb 22 15:19:44 CST 2023 Thread -> id: 13, name: DefaultDispatcher-worker-1, start
// Wed Feb 22 15:19:47 CST 2023 Thread -> id: 79, name: DefaultDispatcher-worker-67, count = 100000
// Wed Feb 22 15:19:47 CST 2023 Thread -> id: 79, name: DefaultDispatcher-worker-67, end
}
5、限制线程(单线程)
val countContext = newSingleThreadContext("countContext")
var count4 = 0
fun singleThreadFun() = runBlocking {
printlnWithThread("start")
withContext(countContext) {
repeat(100) {
launch {
repeat(1000) {
count4++
}
}
}
launch {
delay(3000)
printlnWithThread("count = $count4")
printlnWithThread("end")
}
}
// Wed Feb 22 15:21:27 CST 2023 Thread -> id: 1, name: main, start
// Wed Feb 22 15:21:30 CST 2023 Thread -> id: 13, name: countContext, count = 100000
// Wed Feb 22 15:21:30 CST 2023 Thread -> id: 13, name: countContext, end
}
6、使用 Actors(后续介绍的热流 Channel,类似于 java 的 BlockingQueue)
(1)一个 actor
是由 协程、 被限制并封装到该协程中的 状态,以及一个与其它协程通信的 通道 组合而成的一个实体;
(2)CoroutineScope.actor()
方法返回的是一个 SendChannel
对象。
sealed class CounterMsg
object IncCounter : CounterMsg()
class GetCounter(val response: CompletableDeferred<Int>) : CounterMsg()
fun actorsFun() = runBlocking(Dispatchers.IO) {
// Channel 管道接收消息并处理
val counterActor = actor<CounterMsg> {
var counter = 0
for (msg in channel) {
when (msg) {
is IncCounter -> counter++
is GetCounter -> msg.response.complete(counter)
}
}
}
printlnWithThread("start")
// 发送消息到 Channel 增加计数
repeat(100) {
launch {
repeat(1000) {
counterActor.send(IncCounter)
}
}
}
// 延时3s后,发送消息到 Channel 获取计数结果
launch {
delay(3000)
// 发送一条获取值的消息
val resp = CompletableDeferred<Int>()
counterActor.send(GetCounter(resp))
printlnWithThread("counter = ${resp.await()}")
counterActor.close()
printlnWithThread("end")
}
// Wed Feb 22 15:25:07 CST 2023 Thread -> id: 13, name: DefaultDispatcher-worker-1, start
// Wed Feb 22 15:25:10 CST 2023 Thread -> id: 77, name: DefaultDispatcher-worker-64, counter = 100000
// Wed Feb 22 15:25:10 CST 2023 Thread -> id: 77, name: DefaultDispatcher-worker-64, end
}
注:自定义线程信息打印函数如下:
fun printlnWithThread(msg: String = "") {
val thread = Thread.currentThread()
println("${Date()} Thread -> id: ${thread.id}, name: ${thread.name}, $msg")
}