Kotlin语言(十一):协程 Coroutine

注:本文中使用 runBlocking 是为了方便测试,业务开发中禁止使用

一、协程基础

1、创建协程的三种方式

(1) 使用 runBlocking 顶层函数(业务开发中不会用到这种方法,因为它是线程阻塞的,一般用于单元测试)。

val runBlock = runBlocking {}

(2) 使用 GlobalScope 单例对象创建(Android 开发中不推荐这种用法,因为它的生命周期只受整个应用程序的生命周期限制,且不能取消)。

val globalScopeLaunch = GlobalScope.launch {}
val globalScopeAsync = GlobalScope.async {}

(3) 自行通过 CoroutineScope 创建。

val coroutineScopeLaunch = CoroutineScope(SupervisorJob() + Dispatchers.Default).launch {}
val coroutineScopeAsync = CoroutineScope(SupervisorJob() + Dispatchers.Default).async {}

2、简单示例

fun testCoroutine() = runBlocking {
    launch {
        delay(1000)
        println("hello")
        delay(2000)
        println("world")
    }
    println("test1")
    println("test2")

    // test1
    // test2
    // hello
    // world
}
fun testCoroutine()2 = runBlocking {
    val job = launch {
        delay(1000)
        println("hello")
        delay(2000)
        println("world")
    }
    println("test1")
    job.join() // 显示地等待 job 执行结束
    println("test2")

    // test1
    // hello
    // world
    // test2
}

3、取消协程

fun cancelCoroutine() = runBlocking {
    val job = launch {
        repeat(1000) {
            println("job: test $it ...")
            delay(500)
        }
    }
    delay(1300)
    println("main: ready to cancel !")
    job.cancel() // 取消作业
    job.join()   // 等待作业执行结束
    // job.cancelAndJoin()
    println("main: Now cancel.")

    // job: test 0 ...
    // job: test 1 ...
    // job: test 2 ...
    // main: ready to cancel !
    // main: Now cancel.
}

如果协程正在执行计算任务,并且没有检查取消的话,那么它是不能被取消的。

fun canNotCancelCoroutine() = runBlocking {
    val startTime = System.currentTimeMillis()
    val job = launch(Dispatchers.Default) {
        var nextPrintTime = startTime
        var i = 0
        while (i < 5) {
            if (System.currentTimeMillis() >= nextPrintTime) {
                println("job: hello ${i++} ...")
                nextPrintTime += 500
            }
        }
    }
    delay(1300)
    println("main: ready to cancel !")
    job.cancelAndJoin()
    println("main: now canceled.")

    // job: hello 0 ...
    // job: hello 1 ...
    // job: hello 2 ...
    // main: ready to cancel !
    // job: hello 3 ...
    // job: hello 4 ...
    // main: now canceled.
}

加入状态判断 isActive 或者使用状态检查 ensureActive() 再或者使用 yield() (内部会先检查状态)来确保取消协程。

fun ensureCancelCoroutine() = runBlocking {
    val startTime = System.currentTimeMillis()
    val job = launch(Dispatchers.Default) {
        var nextPrintTime = startTime
        var i = 0
        while (i < 5 && isActive) {
            // ensureActive()
            // yield()
            if (System.currentTimeMillis() >= nextPrintTime) {
                println("job: hello ${i++} ...")
                nextPrintTime += 500
            }
        }
    }
    delay(1300)
    println("main: ready to cancel !")
    job.cancelAndJoin()
    println("main: now canceled.")

    // job: hello 0 ...
    // job: hello 1 ...
    // job: hello 2 ...
    // main: ready to cancel !
    // main: now canceled.
}

4、等待协程执行结果

fun asyncCoroutine() = runBlocking {
    val deferred = async {
        delay(2000)
        "async result"
    }
    val result = deferred.await()
    println("deferred: $result")

    // deferred: async result
}

5、异常处理

(1)try - catch - finally

fun catchExCoroutine() = runBlocking {
    val job = launch {
        try {
            delay(200)
            println("try...")
            throw NullPointerException()
        } catch (e: Exception) {
            println("exception: ${e.message}")
        } finally {
            println("finally...")
        }
    }
    delay(1000)
    println("cancel")
    job.cancel()
    println("done")

    // try...
    // exception: null
    // finally...
    // cancel
    // done
}

(2)CoroutineExceptionHandler

fun handleExCoroutine() = runBlocking {
    val job = launch(CoroutineExceptionHandler { coroutineContext, throwable ->
        println("exception: ${throwable.message}")
    }) {
        delay(200)
        println("try...")
        throw NullPointerException()
    }
    delay(1000)
    println("cancel")
    job.cancel()
    println("done")

    // try...
    // Exception in thread "main" java.lang.NullPointerException
    //      at com.wf.kotlin.study.11协程._01_协程CoroutineKt$handleExCoroutine$1$job$2.invokeSuspend(01.协程Coroutine.kt:186)
    // ......
}

6、协程的超时

fun timeoutCoroutine() = runBlocking {
    withTimeout(300) {
        println("start...")
        delay(100)
        println("progress 1...")
        delay(100)
        println("progress 2...")
        delay(100)
        println("progress 3...")
        delay(100)
        println("progress 4...")
        delay(100)
        println("progress 5...")
        println("end")
    }

    // start...
    // progress 1...
    // progress 2...
    // Exception in thread "main" kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 300 ms
    //      at kotlinx.coroutines.TimeoutKt.TimeoutCancellationException(Timeout.kt:186)
    // ......
}


二、并发与挂起函数

1、使用 async 并发

fun asyncConcurrent() = runBlocking {
    val time = measureTimeMillis {
        printlnWithThread("start")
        val a = async(Dispatchers.IO) {
            printlnWithThread()
            delay(1000)
            1
        }
        val b = async(Dispatchers.IO) {
            printlnWithThread()
            delay(2000)
            2
        }
        printlnWithThread("a + b = ${a.await() + b.await()}")
        printlnWithThread("end")
    }
    printlnWithThread("time: $time")

    // Wed Feb 22 14:36:07 CST 2023  Thread -> id: 1, name: main, start
    // Wed Feb 22 14:36:07 CST 2023  Thread -> id: 13, name: DefaultDispatcher-worker-1,
    // Wed Feb 22 14:36:07 CST 2023  Thread -> id: 15, name: DefaultDispatcher-worker-3,
    // Wed Feb 22 14:36:09 CST 2023  Thread -> id: 1, name: main, a + b = 3
    // Wed Feb 22 14:36:09 CST 2023  Thread -> id: 1, name: main, end
    // Wed Feb 22 14:36:09 CST 2023  Thread -> id: 1, name: main, time: 2065
}

2、惰性启动 async

(1)通过将 start 参数设置为 CoroutineStart.LAZY 变成惰性的;
(2)在这个模式下,调用 await 获取协程执行结果或者调用 Jobstart 方法时,协程才会启动。

fun lazyAsyncConcurrent() = runBlocking {
    val time = measureTimeMillis {
        printlnWithThread("enter")
        val a = async(Dispatchers.IO, CoroutineStart.LAZY) {
            printlnWithThread()
            delay(1000)
            1
        }
        val b = async(Dispatchers.IO, CoroutineStart.LAZY) {
            printlnWithThread()
            delay(2000)
            2
        }
        delay(1000)
        printlnWithThread("start")
        a.start()
        b.start()
        printlnWithThread("a + b = ${a.await() + b.await()}")
        printlnWithThread("end")
    }
    printlnWithThread("time: $time")

    // Wed Feb 22 14:24:32 CST 2023  Thread -> id: 1, name: main, enter
    // Wed Feb 22 14:24:33 CST 2023  Thread -> id: 1, name: main, start
    // Wed Feb 22 14:24:33 CST 2023  Thread -> id: 13, name: DefaultDispatcher-worker-1, 
    // Wed Feb 22 14:24:33 CST 2023  Thread -> id: 15, name: DefaultDispatcher-worker-3, 
    // Wed Feb 22 14:24:35 CST 2023  Thread -> id: 1, name: main, a + b = 3
    // Wed Feb 22 14:24:35 CST 2023  Thread -> id: 1, name: main, end
    // Wed Feb 22 14:24:35 CST 2023  Thread -> id: 1, name: main, time: 3071
}

3、挂起函数

  1. 定义:
    (1)使用 suspend 关键字修饰的函数成为挂起函数;
    (2)挂起函数只能在另一个挂起函数,或者协程中被调用;
    (3)在挂起函数中可以调用普通函数(非挂起函数)。
  2. 实现:实现挂起的的目的是让程序脱离当前的线程,也就是要 切线程
    (1)给函数加上 suspend 关键字;
    (2)如果是耗时操作,则使用 withContext(Dispatchers.IO) 切换线程去执行耗时操作;
    (3)如果是延时操作,则调用 delay(100) 函数即可。
suspend fun callA(): Int {
    printlnWithThread()   // 调用普通函数
    delay(1000) // 调用挂起函数
    return 1
}

fun suspendFunConcurrent() = runBlocking {
    val time = measureTimeMillis {
        printlnWithThread("start")
        val a = async(Dispatchers.IO) {
            callA()
        }
        val b = async(Dispatchers.IO) {
            printlnWithThread()
            delay(2000)
            2
        }
        printlnWithThread("a + b = ${a.await() + b.await()}")
        printlnWithThread("end")
    }
    printlnWithThread("time: $time")

    // Wed Feb 22 14:33:58 CST 2023  Thread -> id: 1, name: main, start
    // Wed Feb 22 14:33:58 CST 2023  Thread -> id: 13, name: DefaultDispatcher-worker-1,
    // Wed Feb 22 14:33:58 CST 2023  Thread -> id: 15, name: DefaultDispatcher-worker-3,
    // Wed Feb 22 14:34:00 CST 2023  Thread -> id: 1, name: main, a + b = 3
    // Wed Feb 22 14:34:00 CST 2023  Thread -> id: 1, name: main, end
    // Wed Feb 22 14:34:00 CST 2023  Thread -> id: 1, name: main, time: 2074
}


三、协程上下文和作用域

1、协程上下文 CoroutineContext

(1)协程上下文包含当前协程 scope 的信息, 比如 Job, Dispatcher, ContinuationInterceptor, CoroutineNameCoroutineId
(2)在 CoroutineContext 中,是用 map 来存这些信息的, map 的键是这些类的 伴生对象,值是这些类的一个实例。

fun jobCoroutineContext() = runBlocking {
    val job = launch(Dispatchers.Default + CoroutineName("test")) {
        printlnWithThread("job: ${this.coroutineContext[Job]}, ${this.coroutineContext[CoroutineName]}")
    }
    printlnWithThread("job: $job")
    printlnWithThread("job: ${job[Job]}")

    // Wed Feb 22 15:29:56 CST 2023  Thread -> id: 1, name: main, job: StandaloneCoroutine{Active}@7d0587f1
    // Wed Feb 22 15:29:56 CST 2023  Thread -> id: 13, name: DefaultDispatcher-worker-1, job: StandaloneCoroutine{Active}@7d0587f1, CoroutineName(test)
    // Wed Feb 22 15:29:56 CST 2023  Thread -> id: 1, name: main, job: StandaloneCoroutine{Active}@7d0587f1
}

2、协程作用域 CoroutineScope

(1)CoroutineScope 的代码很简单,主要作用是提供 CoroutineContext, 启动协程需要 CoroutineContext
(2)作用域可以管理其域内的所有协程,一个 CoroutineScope 可以有许多的子 scope
(3)协程内部是通过 CoroutineScope.coroutineContext 自动继承自父协程的上下文;
(4)一个父协程总是等待所有的子协程执行结束,取消父协程会取消所有的子协程;
(5)默认情况下,协程内,某个子协程抛出一个非 CancellationException 异常,未被捕获,会传递到父协程;
(6)任何一个子协程异常退出,那么整体都将退出。

fun myCoroutineScope() = runBlocking {
    val dispatcher = Executors.newFixedThreadPool(1).asCoroutineDispatcher()
    val myScope = CoroutineScope(dispatcher)
    myScope.launch {
        printlnWithThread()
    }

    // Wed Feb 22 15:30:48 CST 2023  Thread -> id: 13, name: pool-1-thread-1, 
}

3、SupervisorJob

(1)SupervisorJob 与 Job 基本类似,区别在于父协程和兄弟协程不会被此子协程的异常和取消所影响;
(2)适合一些独立不相干的任务,任何一个任务出问题,并不会影响其他任务的工作。

fun supervisorJob() = runBlocking(CoroutineExceptionHandler { _, throwable ->
    printlnWithThread(throwable.message ?: "throwable")
}) {
    launch(Dispatchers.Default + SupervisorJob()) {
        printlnWithThread("child job")
        throw NullPointerException()
    }
    launch {
        delay(1000)
        printlnWithThread("brother job")
    }
    delay(3000)
    printlnWithThread("end")

    // Wed Feb 22 14:50:19 CST 2023  Thread -> id: 13, name: DefaultDispatcher-worker-1, child job
    // Wed Feb 22 14:50:19 CST 2023  Thread -> id: 13, name: DefaultDispatcher-worker-1, ExceptionHandler
    // Wed Feb 22 14:50:20 CST 2023  Thread -> id: 1, name: main, brother job
    // Wed Feb 22 14:50:22 CST 2023  Thread -> id: 1, name: main, end
}


四、协程并发和同步

1、Volatile

(1)保证此变量对所有的线程的可见性(跳过 cpu cache 读取主存);
(2)禁止指令重排序优化;
(3)不会阻塞线程;
(4)不能保证原子性(即不++、--等非一次原子操作中不能同步)。

@Volatile
var flag1 = true

fun volatileFun() {
    printlnWithThread("start")

    Thread {
        flag1 = false
        printlnWithThread("flag1 = $flag1")
    }.start()

    while (flag1) {
        printlnWithThread("flag1 = $flag1")
    }

    printlnWithThread("end")

    // Wed Feb 22 15:08:52 CST 2023  Thread -> id: 1, name: main, start
    // Wed Feb 22 15:08:52 CST 2023  Thread -> id: 1, name: main, flag1 = true
    // Wed Feb 22 15:08:52 CST 2023  Thread -> id: 1, name: main, flag1 = true
    // Wed Feb 22 15:08:52 CST 2023  Thread -> id: 1, name: main, flag1 = true
    // Wed Feb 22 15:08:52 CST 2023  Thread -> id: 1, name: main, flag1 = true
    // Wed Feb 22 15:08:52 CST 2023  Thread -> id: 13, name: Thread-0, flag1 = false
    // Wed Feb 22 15:08:52 CST 2023  Thread -> id: 1, name: main, end
}

2、synchronized

var flag2 = true

fun synchronizedFun() {
    printlnWithThread("start")

    Thread {
        flag2 = false
        printlnWithThread("flag2 = $flag2")
    }.start()

    while (flag2) {
        synchronized(flag2) {
            printlnWithThread("flag2 = $flag2")
        }
    }

    printlnWithThread("end")

    // Wed Feb 22 15:08:10 CST 2023  Thread -> id: 1, name: main, start
    // Wed Feb 22 15:08:10 CST 2023  Thread -> id: 1, name: main, flag2 = true
    // Wed Feb 22 15:08:10 CST 2023  Thread -> id: 13, name: Thread-0, flag2 = false
    // Wed Feb 22 15:08:10 CST 2023  Thread -> id: 1, name: main, end
}

3、使用线程安全的数据结构 Atomicxxxx、ReentrantLock等

var count = AtomicInteger()
fun atomicFun() = runBlocking(Dispatchers.IO) {
    printlnWithThread("start")
    repeat(100) {
        launch {
            repeat(1000) {
                count.incrementAndGet()
            }
        }
    }
    launch {
        delay(3000)
        printlnWithThread("count = ${count.get()}")
        printlnWithThread("end")
    }

    // Wed Feb 22 15:14:40 CST 2023  Thread -> id: 13, name: DefaultDispatcher-worker-1, start
    // Wed Feb 22 15:14:43 CST 2023  Thread -> id: 24, name: DefaultDispatcher-worker-12, count = 100000
    // Wed Feb 22 15:14:43 CST 2023  Thread -> id: 24, name: DefaultDispatcher-worker-12, end
}
var lock = ReentrantLock()
var count2 = 0
fun lockFun() = runBlocking(Dispatchers.IO) {
    printlnWithThread("start")
    repeat(100) {
        launch {
            repeat(1000) {
                lock.lock()
                try {
                    count2++
                } finally {
                    lock.unlock()
                }
            }
        }
    }
    launch {
        delay(3000)
        printlnWithThread("count = $count2")
        printlnWithThread("end")
    }

    // Wed Feb 22 15:17:45 CST 2023  Thread -> id: 13, name: DefaultDispatcher-worker-1, start
    // Wed Feb 22 15:17:48 CST 2023  Thread -> id: 30, name: DefaultDispatcher-worker-18, count = 100000
    // Wed Feb 22 15:17:48 CST 2023  Thread -> id: 30, name: DefaultDispatcher-worker-18, end
}

4、协程专属锁 Mutex

(1)它具有 lockunlock 方法,关键的区别在于, Mutex.lock() 是一个挂起函数,它不会阻塞当前线程;
(2)还有 withLock 扩展函数,可以方便的替代常用的 mutex.lock()try { …… } finally { mutex.unlock() } 模式。

var mutex = Mutex()
var count3 = 0
fun mutexFun() = runBlocking(Dispatchers.IO) {
    printlnWithThread("start")
    repeat(100) {
        launch {
            repeat(1000) {
                mutex.withLock {
                    count3++
                }
            }
        }
    }
    launch {
        delay(3000)
        printlnWithThread("count = $count3")
        printlnWithThread("end")
    }

    // Wed Feb 22 15:19:44 CST 2023  Thread -> id: 13, name: DefaultDispatcher-worker-1, start
    // Wed Feb 22 15:19:47 CST 2023  Thread -> id: 79, name: DefaultDispatcher-worker-67, count = 100000
    // Wed Feb 22 15:19:47 CST 2023  Thread -> id: 79, name: DefaultDispatcher-worker-67, end
}

5、限制线程(单线程)

val countContext = newSingleThreadContext("countContext")
var count4 = 0
fun singleThreadFun() = runBlocking {
    printlnWithThread("start")
    withContext(countContext) {
        repeat(100) {
            launch {
                repeat(1000) {
                    count4++
                }
            }
        }
        launch {
            delay(3000)
            printlnWithThread("count = $count4")
            printlnWithThread("end")
        }
    }

    // Wed Feb 22 15:21:27 CST 2023  Thread -> id: 1, name: main, start
    // Wed Feb 22 15:21:30 CST 2023  Thread -> id: 13, name: countContext, count = 100000
    // Wed Feb 22 15:21:30 CST 2023  Thread -> id: 13, name: countContext, end
}

6、使用 Actors(后续介绍的热流 Channel,类似于 java 的 BlockingQueue)

(1)一个 actor 是由 协程、 被限制并封装到该协程中的 状态,以及一个与其它协程通信的 通道 组合而成的一个实体;
(2)CoroutineScope.actor() 方法返回的是一个 SendChannel 对象。

sealed class CounterMsg
object IncCounter : CounterMsg()
class GetCounter(val response: CompletableDeferred<Int>) : CounterMsg()

fun actorsFun() = runBlocking(Dispatchers.IO) {
    // Channel 管道接收消息并处理
    val counterActor = actor<CounterMsg> {
        var counter = 0
        for (msg in channel) {
            when (msg) {
                is IncCounter -> counter++
                is GetCounter -> msg.response.complete(counter)
            }
        }
    }
    printlnWithThread("start")
    // 发送消息到 Channel 增加计数
    repeat(100) {
        launch {
            repeat(1000) {
                counterActor.send(IncCounter)
            }
        }
    }
    // 延时3s后,发送消息到 Channel 获取计数结果
    launch {
        delay(3000)
        // 发送一条获取值的消息
        val resp = CompletableDeferred<Int>()
        counterActor.send(GetCounter(resp))
        printlnWithThread("counter = ${resp.await()}")
        counterActor.close()
        printlnWithThread("end")
    }

    // Wed Feb 22 15:25:07 CST 2023  Thread -> id: 13, name: DefaultDispatcher-worker-1, start
    // Wed Feb 22 15:25:10 CST 2023  Thread -> id: 77, name: DefaultDispatcher-worker-64, counter = 100000
    // Wed Feb 22 15:25:10 CST 2023  Thread -> id: 77, name: DefaultDispatcher-worker-64, end
}





注:自定义线程信息打印函数如下:

fun printlnWithThread(msg: String = "") {
    val thread = Thread.currentThread()
    println("${Date()}  Thread -> id: ${thread.id}, name: ${thread.name}, $msg")
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
禁止转载,如需转载请通过简信或评论联系作者。
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,718评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,683评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,207评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,755评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,862评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,050评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,136评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,882评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,330评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,651评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,789评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,477评论 4 333
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,135评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,864评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,099评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,598评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,697评论 2 351

推荐阅读更多精彩内容