kotlin进阶—深入理解协程

概念

  • 实际就是kotlin官方提供的线程API,相当于AsyncTask
  • 特性:非阻塞挂起,可挂起/恢复执行
  • 本质:
    协程就是个线程框架
    协程的挂起本质就是线程切出去再切回来
  • 依赖
implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-android:1.3.7'

语法

  • CoroutineScope
    可以理解为协程本身,包含了CoroutineContext

  • CoroutineContext
    协程上下文,是一些元素的集合,主要包括Job和CoroutineDispatcher元素,可以代表一个协程的场景

  • CoroutineDispatcher
    协程调度器,可以指定协程运行于特定的一个线程、一个线程池或者不指定,如果不指定,则协程运行于当前线程
    默认有四个值:Dispatchers.Default,Dispatchers.IO,Dispatchers.Main,Dispatchers.Unconfined(不指定)

  • launch创建新的协程
    CoroutineScope.launch是最常用的Coroutine builders
    如果不指定CoroutineDispatcher,默认的协程调度器是Dispatchers.Default,Default是一个协程调度器,指定的协程为共有的线程池,线程数量至少2,最大与CPU数量相同

  • dealy
    delay功能相当于Thread.sleep但是他不会造成线程阻塞,但是Thread.sleep会造成阻塞
    我们看下官方提供的注释

Delays coroutine for a given time without blocking a thread and resumes it after a specified time.

此功能实际只是延迟协程执行,而不会去阻塞线程

  • job和deferred
    job封装了协程中需要执行的代码逻辑,Job可以取消并且有简单的生命周期
    Job完成的时候没有返回值,Deferred是job的子类,并且有返回值
job生命周期
  • runBlocking(通常用于测试)
    创建新的协程同时阻塞当前线程,直到协程结束。
  • withContext
    这个函数用处是切换到指定的线程,并在闭包内的逻辑执行结束之后,自动把线程切回去继续执行
    不会创建新的协程

  • async
    与launch一样的效果,唯一的区别是,async具有返回值

协程和线程

1、协程实现延迟

fun main() {
   GlobalScope.launch {
       delay(1000)
       println("Kotlin")
   }
    println("Hello,")
    Thread.sleep(2000)
    println("World!")
}
  • 1、整个main实际是一个线程
  • 2、delay不会阻塞线程,而是将协程延迟一秒,所以会向下执行线程中的代码打印hello
  • 3、Thread.sleep会阻塞线程,过一秒之后,由于协程延迟时间已过,则会执行协程向下打印,打印出kotlin
  • 4、2秒后,打印出World!

如果Thread.sleep(900)会打印什么?
我们会发现,只打印了Hello,World!。原因是0.9秒之后打印完World!后,线程已经结束,而协程是依附在线程上,所以线程结束,协程也已经结束

2、线程实现

fun main() {
    thread {
        Thread.sleep(1000)
        println("Kotlin")
    }
    println("Hello,")
    Thread.sleep(2000)
    println("World!")
}

运行结果后,我们会发现运行结果和代码一是一样的

如果此时将Thread.sleep(2000)变成Thread.sleep(900)会打印什么?
我们会发现,当我们打印出Hello,World!之后会继续打印Kotlin

3、runBlocking实现

fun main() {

    GlobalScope.launch {
        delay(1000)
        println("Kotlin")
    }
    println("Hello,")
    runBlocking {
        delay(2000)
    }
    println("World!")
}

上面代码还可以这么写

fun main()= runBlocking {

    GlobalScope.launch {
        delay(1000)
        println("Kotlin")
    }
    println("Hello,")

    delay(2000)
    println("World!")
}

结果与上面两个方法一致

4、任务泄漏

  • 当某个协程任务丢失,无法追踪,会导致内存,CPU等资源浪费,甚至发送一个无用的网络请求,这种情况叫做任务泄漏
  • 为了避免协程泄漏,kotlin引入结构化并发机制
  • 结构化并发可以取消任务、追踪任务和发出错误信号

CoroutinueScope

  • 定义协程必须指定其CoroutinueScope,它会用于追踪所有协程,同样它也可以取消由它所启动的协程
  • 常见API:
    • GlobalScope:生命周期是process,即使Activity和Fragment已经被销毁,协程仍然在执行
    • MainScope:Activity中使用,可以在onDestory中取消协程
    • viewModelScope:只能在viewModel中使用,绑定ViewModel的生命周期
    • lifecyScope:只能在Activity、Fragment中使用,会绑定activity和Fragment的生命周期

协程的启动和取消

启动和协程

launch源码

public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job {
    val newContext = newCoroutineContext(context)
    val coroutine = if (start.isLazy)
        LazyStandaloneCoroutine(newContext, block) else
        StandaloneCoroutine(newContext, active = true)
    coroutine.start(start, coroutine, block)
    return coroutine
}

launch实际返回的job对象,而job中有个join方法。注意这里的block是suspend方法,而suspend方法只有suspend 函数或协程可被调用

join的源码

    /**
     * Suspends the coroutine until this job is complete. This invocation resumes normally (without exception)
     * when the job is complete for any reason and the [Job] of the invoking coroutine is still [active][isActive].
     * This function also [starts][Job.start] the corresponding coroutine if the [Job] was still in _new_ state.
     */
    public suspend fun join()

join的作用就是会挂起协程直到任务完成,随后会恢复正常

fun main() = runBlocking {

    val myJob = GlobalScope.launch {
        delay(1000)
        println("Kotlin")
    }
    println("Hello,")
    myJob.join()
    println("World!")

}

流程:首先会走launch,因为delay了1秒,所以走线程,打印Hello,此时继续向下执行遇到了join,join的作用是会等到launch方法执行完毕后,再向下执行

runBlocking+launch的使用

fun main() = runBlocking {

    launch {
        delay(1000)
        println("Kotlin")
    }
    println("Hello,")
}

我们会发现:先打印Hello,后打印Kotlin,而不是像之前协程实现延迟一样,打印Hello就结束了。这是因为每一个协程构建器都会向其代码块作用域添加一个CoroutinueScope实例,我们可以在该 作用域启动协程,而无需显示将其join到一起,理由是外部协程(就是上面实例中的runBlocking)会等待所有启动协程全部完成后才会完成

而GlobalScope.launch会退出的原因是GlobalScope是守护线程

coroutineScope
等待所有子协程完成其任务,但并不会阻塞当前的线程

fun main()= runBlocking{
     launch {
         delay(1000)
         println("launch 1")
     }
    println("main 1")
    coroutineScope {
        launch {
            delay(3000)
            println("coroutineScope 1")
        }
        delay(500)
        println("coroutineScope 1")
    }
    println("main 2")
}

结果:
main 1
coroutineScope 1
launch 1
coroutineScope 1
main 2

coroutineScope是挂起函数,也就是说当前协程会被挂起,那么coroutineScope函数也会被挂起。此时它创建的对象runBlocking函数会继续执行之前的代码,但是下面的代码必须等待挂起函数coroutineScope执行完毕之后才会继续执行

coroutineScope和supervisorScope

  • coroutineScope:一个协程失败,所有兄弟协程也会被取消
  • supervisorScope:一个协程失败,不会影响到其他兄弟协程
协程的启动模式
public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,//👈🏻协程的启动模式
    block: suspend CoroutineScope.() -> Unit
): Job {
    val newContext = newCoroutineContext(context)
    val coroutine = if (start.isLazy)
        LazyStandaloneCoroutine(newContext, block) else
        StandaloneCoroutine(newContext, active = true)
    coroutine.start(start, coroutine, block)
    return coroutine
}
  • Default:协程创建后,立即开始调度,在调度前如果协程被取消,其将进入取消响应的状态
  • ATOMIC:协程创建后,立即开始调度,协程执行到第一个挂起点之前不响应取消
  • Lazy:只有协程被需要时,包括主动调用协程的start、join或者await等函数时才会开始调度,如果调度前就被取消,那么协程直接进入异常结束状态
  • UNDISPATCHED:协程创建后立即在当前函数调用栈中执行,直到遇到第一个真正挂起的点
挂起suspend
  • 挂起的对象是协程
  • 协程其实就是launch 或者 async 函数这两个函数中闭包的代码块
  • 在执行到某一个 suspend 函数的时候,这个协程会被suspend,也就是被挂起。
  • 协程从当前线程挂起。换句话说,就是这个协程从正在执行它的线程上脱离
  • 被suspend修饰的函数只能被有suspend修饰的函数调用或者协程调用
全局协程 GlobalScope

全局协程类似守护线程
使用 GlobalScope启动的活动协程并不会保证进程的生命, 会随着线程销毁而销毁

fun main(){
  GlobalScope.launch {
      repeat(100){
          println("111")
          delay(500)
      }
  }
  Thread.sleep(2000)
}
协程的取消和超时
  • kotlinx.coroutines包下的所有挂起函数都是可以取消,它们会检查协程的取消状态,当取消的时候就会抛出CancellationException异常
  • 如果协程正在进行计算过程中,并且没有检查取消状态,则无法被取消
fun main() = runBlocking{
   val currentTimeMillis = System.currentTimeMillis()
   val job = launch {
       var I=0
       var startTime=currentTimeMillis
       while (i<20) {
          if(System.currentTimeMillis()>=startTime){
              println("launch${i++}")
             startTime+=500L
          }
       }
   }
   delay(1300)
   println("main")
   job.cancelAndJoin()
   println("end")
}

结果:


image.png

上面的代码协程实际并没有执行取消

如何让计算代码变为可取消

  • 周期性的调用一个周期性挂起函数,该挂起函数会检查是否取消状态,比如yield
  • 显示的检查取消状态
fun main() = runBlocking{
    val currentTimeMillis = System.currentTimeMillis()
    val job = launch(Dispatchers.Default) {
        var I=0
        var startTime=currentTimeMillis
        while (isActive) {
           if(System.currentTimeMillis()>=startTime){
               println("launch${i++}")
              startTime+=500L
           }
        }
    }
    delay(1300)
    println("main")
    job.cancelAndJoin()
    println("end")
}
  • isActive是一个可以被使用在CorcountineScope中的扩展属性,检查Job是否处于活跃状态
  • ensureActive():如果 Job处于活跃状态,这个方法会立即抛出异常
  • yield函数会检查所在协程的状态,如果已被取消,则抛出CancellationException予以响应。

finally资源清理
join和cancelAndjoin都会等待清理动作完成才会继续往下执行

fun main() = runBlocking{
    val job = launch() {
        try {
       repeat(100){
           println("launch$it")
           delay(500)
       }
        }finally {
          println("执行清理动作")
        }
    }
    delay(1300)
    println("main")
    job.cancelAndJoin()
    println("end")
}

finally之后表示协程已经真正结束被取消了,此时如果使用挂起函数,会导致CancellationException异常

fun main() = runBlocking {
    val job = launch() {
        try {
            repeat(100) {
                println("launch$it")
                delay(500)
            }
        } finally {
            println("执行清理动作")
            delay(1000)
            println("真的结束了吗")//并没有被打印
        }
    }
    delay(1300)
    println("main")
    job.cancelAndJoin()
    println("end")
}

上面代码结果并没有打印"真的结束了吗",原因是:此时的协程已经被取消了。在少数情况下,当我们一个取消的协程中进行挂起函数,怎么办呢?可以使用withContext(NonCancellable) {}

fun main() = runBlocking {
    val job = launch() {
        try {
            repeat(100) {
                println("launch$it")
                delay(500)
            }
        } finally {
            withContext(NonCancellable) {
                println("执行清理动作")
                delay(1000)
                println("真的结束了吗")
            }
        }
    }
    delay(1300)
    println("main")
    job.cancelAndJoin()
    println("end")
}

结果:


image.png

withTimeout和withTimeoutOrNull
withTimeout和withTimeoutOrNull都可以解决超时的问题,区别在于:withTimeoutOrNull在控制台不会抛出异常,而是直接返回null,但是和withTimeout会在控制台抛出异常

fun main() = runBlocking {
    withTimeout(1300) {
        repeat(100) {
            println("launch$it")
            delay(500)
        }
    }
}
fun main() = runBlocking {
    val result = withTimeoutOrNull(1300) {
        repeat(100) {
            println("launch$it")
            delay(500)
        }
    }
    println(result)
}
async/await:Deferred实现并发

用于执行协程任务,并得到执行的结果,结果值是Deferred,这是一个轻量级非阻塞的future,可以在稍后提供一个结果值,可以通过.await()方法获取最终的值

fun main() = runBlocking {
   val job= async {
        delay(1000)
        return@async "Kotlin"
    }
    println("Hello,")
    println(job.await())
    println("end")
}
  • 通过async方法的start参数设置为CoroutinueStart.Lazy来实现协程的延迟执行,在这种情况,协程有两种方法去执行
    • 调用Deferred的await方法
    • 调用job的start 方法
fun main() = runBlocking {
    val time = measureTimeMillis {
        val a = async(start = CoroutineStart.LAZY) { A() }
        val b = async(start = CoroutineStart.LAZY) { B() }
        println("start")
        Thread.sleep(3000)
        a.start()
        b.start()
        println("${a.await()}+${b.await()}=${a.await() + b.await()}")
    }
    println("时间是$time")
}

private suspend fun A(): Int {
    delay(2000)
    return 2
}

private suspend fun B(): Int {
    delay(3000)
    return 4
}

注意:start是并行,如果没有start而是await则是串行

父子异常取消和异常
fun main() = runBlocking<Unit> {
    try {
        failureSum()
    } catch (e: Exception) {
        println("failureSum failure")
    }
}

private suspend fun failureSum(): Int = coroutineScope {
    val value = async {
        try {
            delay(2000)
            50
        } finally {
            println("value was cancel")
        }
    }
    val failure = async<Int> {
        Thread.sleep(1000)
        println("出现异常了")
        throw Exception()
    }
    value.await() + failure.await()
}

结果:

出现异常了
value was cancel
failureSum failure

Process finished with exit code 0

协程上下文和分发器

  • 协程总会在某个上下文中执行,这个上下文实际是由CoroutineContext类型的一个实例来表示的,这个实例实际是由kotlin标准库定义的
  • 协程上下文本质是各种元素所构成的一个集合。其中主要元素包括job,以及分发器
  • 所谓的分发器,其主要功能是确定由哪个线程来执行我们所指定的协程代码
  • 协程上下文包含了一个协程分发器(CoroutineDispatcher),协程分发器确定了到底由哪个线程或者线程池来执行我们指定的协程,协程分发器可以将协程执行限制到一个具体的线程,也可以将协程的执行分发到一个线程池中,由线程池中的某个线程来执行所指定的协程,还可以不加任何限制执行我们所指定的协程代码
fun main() = runBlocking<Unit>{
     launch {
         println("no params,thread:${Thread.currentThread().name}")
     }
    launch(Dispatchers.Unconfined) {
        println("Dispatchers.Unconfined,thread:${Thread.currentThread().name}")
    }
    launch(Dispatchers.Default) {
        println("Dispatchers.Default,thread:${Thread.currentThread().name}")
    }
    val thread = Executors.newSingleThreadExecutor().asCoroutineDispatcher()
    launch(thread) {
        println("SingleThreadExecutor,thread:${Thread.currentThread().name}")
        thread.close()
    }
    GlobalScope.launch {
        println("GlobalScope.launch,thread:${Thread.currentThread().name}")
    }
}

程序分析

  • 1、当通过launch来启动协程并且不指定协程分发器的时候,它会继承启动他的那个CoroutineScope的上下文与分发器,对于该例子,它会继承runBlocking的上下文,而runBlocking则是运行在main线程
  • 2、Dispatchers.Unconfined是一种特殊的协程分发器,其运行机制与不指定的协程分发器时时完全不同,在日常中很少被使用
  • 3、Dispatchers.Default是默认的分发器,当协程是通过GlobalScope来启动的时候,它会使用该默认的分发器来启动协程,它会使用一个后台的共享线程池来运行代码,因此,Dispatchers.Default实际等价于GlobalScope.launch { }
  • 4、Executors.newSingleThreadExecutor().asCoroutineDispatcher()创建一个单线程的线程池,该线程池中的线程用来执行我们所指定的协程代码,在实际的开发过程中,使用专门的线程来执行协程代码的代价是非常高的,因为协程执行完之后,需要释放资源。如示例中的close方法
协程上下文继承

协程上下文=默认值+继承的CoroutineContext+参数

  • 一些元素包含默认值:Dispatchers.Default是默认的CoroutineDispatcher,以及"coroutine"作为协程的CoroutineName
  • 继承的CoroutineContext是CoroutineScope或者其父协程的CoroutineContext
  • 传入协程构建器的参数的优先级高于继承的上下文参数,因此会覆盖对应的参数
fun main() {
    runBlocking {
        val coroutineScope = CoroutineScope(Job() + Dispatchers.Main + CoroutineName("peakmain"))
        val launch = coroutineScope.launch(Dispatchers.IO) {
            println(Thread.currentThread().name)//👈🏻DefaultDispatcher-worker-1
        }
        launch.join()
    }
}

异常的传播

根协程的异常

  • 协程构建器有两种形式:⾃动传播异常(launch 与 actor)或向⽤⼾暴露异常(async 与 produce)
  • async依赖⽤⼾来最终消费异常,如通过await
  • launch视为未捕获异常,类似Java的Thread.uncaughtExceptionHandler
fun main() {
    runBlocking {
        val job = GlobalScope.launch {//根协程
            println("Global的异常")
            throw NullPointerException()//控制台打印
        }
        job.join()
        println("joined failed job ")
        val async = GlobalScope.async {//根协程
            println("async 的异常")
            throw IndexOutOfBoundsException()//控制台不打印
        }
        try {
            async.await()
            println("exception")//不打印
        } catch (e: Exception) {
            println("IndexOutOfBoundsException")//打印
        }

    }
}

非根协程的异常

  • 其他协程所创建的协程中,产生的异常总会被传播
fun main() {
    runBlocking {

        val coroutineScope = CoroutineScope(Job())
        val launch = coroutineScope.launch {
            async {
                println("async 的异常")
                throw IndexOutOfBoundsException()//控制台不打印
            }
        }
        launch.join()
    }
}

如果async抛出异常,launch就会立即抛出异常,而不会等到调用.await

异常的传播性
当一个协程由于一个异常而运行失败时,它会传播这个异常并传递给他的父级。接下来,父级会做以下几件事情:

  • 取消它自己的子级
  • 取消自己
  • 将异常传播并传递给自己的父级

SupervisorJob

  • SupervisorJob一个子协程的失败不影响到其他子协程。SupervisorJob不会传播异常给它的父级,它会让子协程自己处理异常
fun main() {
    runBlocking {

        val job = CoroutineScope(SupervisorJob())
        val launch1 = job.launch {
           delay(100)
            println("launch1")
            throw NullPointerException("launch1 exception")
        }
        val launch2 = job.launch {
            try {
                delay(Long.MAX_VALUE)
            } finally {
                println("launch2 finished")
            }
        }
        joinAll(launch1,launch2)
    }
}

CoroutineExceptionHandler

  • CoroutineExceptionHandler相当于Thread.uncaughtExceptionHandler
  • 需要安装在外部(根)协程上
fun main() {
    runBlocking {
        val handler = CoroutineExceptionHandler { _, exception ->
            println(exception.message)
        }
        val coroutineScope = CoroutineScope(Job())
        val launch = coroutineScope.launch(handler) {
            launch {
                throw IllegalAccessError("error is null")
            }
        }
        launch.join()
    }
}

Android获取全局异常处理

  • 异常处理器可以获取到所有协程未处理的未捕获异常,不过不能阻止异常崩溃,但是在调试和异常上报等场景有非常大的用处
  • src/main目录下新建目录resources/META-INF/services,再在其的目录下创建文件,kotlinx.coroutines.CoroutineExceptionHandler,其内容是自己创建的全局异常处理器全类名,例如我的
com.peakmain.project.GlobalCoroutinueExceptionHandler
class GlobalCoroutinueExceptionHandler() :CoroutineExceptionHandler {
    override val key = CoroutineExceptionHandler
    override fun handleException(context: CoroutineContext, exception: Throwable) {
        Log.e("TAG","异常信息:${exception.message}")
    }
}

取消和异常

  • 取消和异常紧密相关,协程内部使用CancellationException来进行取消,这个异常会被忽略
  • 当子协程取消时,不会取消它的父协程
  • 如果一个协程遇到CancellationException以外的异常,它将使用该异常取消它的父协程。当父协程的所有子协程都结束后,异常才会被父协程处理

Flow-异步流

  • 名为flow的Flow类型构建器函数
  • flow{...}构建块中的代码可以挂起
  • 函数simple不再标有suspend修饰符
  • 流使用emit发送值
  • 流使用collect函数收集值

通过flow异步返回多个值

fun main() {
    runBlocking {
        simpleFlow().collect{
            println(it)
        }
    }
}
suspend fun simpleFlow() = flow<Int> {
    for (i in 1..3) {
        delay(1000)
        emit(i)//发送,产生一个元素
    }
}

冷流/热流

  • 冷流:flow构建器中的代码直到流被收集的时候才运行
  • 通过 flow{ ... } 返回的就是冷流同时是同步的
  • 通过 channelFlow返回的是热流同时是异步的

流的连续性

  • 流的每次单独收集都是按顺序执行的,除非使用特殊操作符
  • 从上游到下游每个过度操作符都会处理每个发射出来的值,然后交给末端操作符
fun main() {
    runBlocking {
        (1..100).asFlow().filter {
            it%2==0
        }.map {
            "偶数:$it"
        }.collect {
            println(it)
        }
    }
}

流构建器

  • flowOf构建器定义了一个发送固定值集的流
  • 使用.asFlow()扩展函数,可以将各种集合和序列转成流
       flowOf(1,2,3,4,5).onEach {
           delay(1000)
       }.collect {
           println(it)
       }

流上下文

  • 流收集总是在调用协程上下文中发生,流的属性称为上下文保存
  • flowOn用于更改流发射的上下文
fun main() {
    runBlocking {
        flow<Int> {
            println(Thread.currentThread().name)
            for (i in 1..10){
                emit(i)
            }
        }.flowOn(Dispatchers.IO)//相当于切线程,将上游放到IO线程处理
            .collect {
                println("$it=${Thread.currentThread().name}")//👈🏻下游的线程由整个flow运行的CoroutineContext决定
            }
    }
}

指定协程收集流

  • onEach收集流
  • launchIn指定onEach收集流的线程
fun main() {
    runBlocking {
        flow<Int> {
            println("main:${Thread.currentThread().name}")
            for (i in 1..10){
                emit(i)
            }
        }.flowOn(Dispatchers.IO)//相当于切线程,将上游放到IO线程处理
            .onEach { println(Thread.currentThread().name) }
            .launchIn(CoroutineScope(Dispatchers.IO))
            .join()

    }
}

flow取消

fun main() {
    runBlocking {
        val job = flow<Int> {
            println("main:${Thread.currentThread().name}")
            for (i in 1..10) {
                delay(1000)
                emit(i)
            }
        }.flowOn(Dispatchers.IO)//相当于切线程,将上游放到IO线程处理
            .onEach { println(Thread.currentThread().name) }
            .launchIn(CoroutineScope(Dispatchers.IO))
        delay(2000)
        job.cancel()
        job.join()

    }
}

flow取消检测

  • 通过ensureActive函数来检测该协程是否被取消了
fun main() {
    runBlocking {
        (1..10).asFlow().collect {
            println(it)
            if(it==4){
                cancel()
            }
        }
    }
}

结果:

1
2
3
4
5
6
7
8
9
10
Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job=BlockingCoroutine{Cancelled}@7e0b37bc

我们发现取消失效了

  • 必须明确检测是否取消,通过cancellable操作符来操作
fun main() {
    runBlocking {
        (1..10).asFlow()
            .cancellable().collect {
                println(it)
                if (it == 4) {
                    cancel()
                }
            }
    }
}

通道

  • 延期的值提供了⼀种便捷的⽅法使单个值在多个协程之间进⾏相互传输。通道提供了⼀种在流中传输值的⽅法。

协程之间通信

fun main() {
    runBlocking {
        val channel= Channel<Int>()
        launch {
            var i=0
            while (true){
                delay(500)
                channel.send(++i)
                println("发送:$i")
            }
        }
        launch {
             while (true){
                 val element=channel.receive()
                 println("receive $element")
             }
        }

    }
}

容量

  • Channel实际是一个队列,队列中一定存在缓冲区,那么一旦这个缓存区满了,并且一直没有调用receive取走函数,send就需要挂起。所以send经常会被挂起,直到receive才会继续走下去

迭代

  • Channel本身像序列,所以在读取的时候,可以直接获取一个Channel的Iterator
fun main() {
    runBlocking {
        val channel = Channel<Int>(Channel.UNLIMITED)//设置容量
        launch {
            for (x in 0..5) {
                channel.send(x)
                println("发送:$x")
            }
        }
        launch {
            val iterator = channel.iterator()
            while (iterator.hasNext()) {
                val element = channel.receive()
                println("receive $element")
                delay(1000)
            }
        }

    }
}

原理

挂起原理

suspend fun A():String{
   delay(2*1000)
   println("delay after")
   return "I am Peakmain"
}
//反编译之后的代码
public final class TestKt {
  @Nullable
// $completion实际是个回掉方法
  public static final Object A(@NotNull Continuation $completion) {
     Object $continuation;
     label20: {
        $continuation = new ContinuationImpl($completion) {
           Object result;
           int label;

           @Nullable
           public final Object invokeSuspend(@NotNull Object $result) {
              this.result = $result;
              this.label |= Integer.MIN_VALUE;
    //系统自己会调用一次自己
              return TestKt.A(this);
           }
        };
     }
     Object $result = ((<undefinedtype>)$continuation).result;
    // 这里的var5实际是COROUTINE_SUSPENDED
     Object var5 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
     switch(((<undefinedtype>)$continuation).label) {
     case 0:
        ResultKt.throwOnFailure($result);
    //label修改成1
        ((<undefinedtype>)$continuation).label = 1;
      //DelayKt.delay会回掉ContinuationImpl的invokeSuspend
        if (DelayKt.delay(2000L, (Continuation)$continuation) == COROUTINE_SUSPENDED) {
       //return之后delay方法就不会向下执行
           return var5;
        }
        break;
     case 1:
        ResultKt.throwOnFailure($result);
        break;
     default:
        throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
     }

     String var1 = "delay after";
     boolean var2 = false;
     System.out.println(var1);
     return "I am Peakmain";
  }
}

DelayKt.delay源码分析

public final class DelayKt {
  public static final Object delay(long timeMillis, @NotNull Continuation $completion) {//$completion回掉函数
     if (timeMillis <= 0L) {
        return Unit.INSTANCE;
     } else {
        int $i$f$suspendCancellableCoroutine = false;
        int var5 = false;
        CancellableContinuationImpl cancellable$iv = new CancellableContinuationImpl(IntrinsicsKt.intercepted($completion), 1);
        cancellable$iv.initCancellability();
        CancellableContinuation cont = (CancellableContinuation)cancellable$iv;
        int var8 = false;
     //getDelay实际是DefaultExecutor
        getDelay(cont.getContext()).scheduleResumeAfterDelay(timeMillis, cont);
        Object var10000 = cancellable$iv.getResult();
        if (var10000 == IntrinsicsKt.getCOROUTINE_SUSPENDED()) {
           DebugProbesKt.probeCoroutineSuspended($completion);
        }
     //主要看返回结果就可以了
        return var10000;
     }
  }
}
   internal fun getResult(): Any? {
       installParentCancellationHandler()
       if (trySuspend()) return COROUTINE_SUSPENDED
        ...
       return getSuccessfulResult(state)
   }
   private fun trySuspend(): Boolean {
       _decision.loop { decision ->
           when (decision) {
            //默认是UNDECIDED
               UNDECIDED -> if (this._decision.compareAndSet(UNDECIDED, SUSPENDED)) return true
               RESUMED -> return false
               else -> error("Already suspended")
           }
       }
   }

我们从上面代码可以知道,挂起的本质实际是return,suspend的作用实际是添加一个回调

恢复
协程的核心本质是挂起-恢复,而挂起-恢复的本质实际是return+callback

回到DelayKt.delay中的 scheduleResumeAfterDelay源码
上面我们知道getDelay实际是DefaultExecutor,而DefaultExecutor继承于EventLoopImplBase

    public override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
        val timeNanos = delayToNanos(timeMillis)
        if (timeNanos < MAX_DELAY_NS) {
            val now = nanoTime()
            DelayedResumeTask(now + timeNanos, continuation).also { task ->
                continuation.disposeOnCancellation(task)
                schedule(now, task)
            }
        }
    }
    public fun schedule(now: Long, delayedTask: DelayedTask) {
        when (scheduleImpl(now, delayedTask)) {
            SCHEDULE_OK -> if (shouldUnpark(delayedTask)) unpark()
            SCHEDULE_COMPLETED -> reschedule(now, delayedTask)
            SCHEDULE_DISPOSED -> {} // do nothing -- task was already disposed
            else -> error("unexpected result")
        }
    }
   private fun scheduleImpl(now: Long, delayedTask: DelayedTask): Int {
        if (isCompleted) return SCHEDULE_COMPLETED
        val delayedQueue = _delayed.value ?: run {
            _delayed.compareAndSet(null, DelayedTaskQueue(now))
            _delayed.value!!
        }
        return delayedTask.scheduleTask(now, delayedQueue, this)
    }
fun scheduleTask(now: Long, delayed: DelayedTaskQueue, eventLoop: EventLoopImplBase): Int {
            if (_heap === DISPOSED_TASK) return SCHEDULE_DISPOSED // don't add -- was already disposed
            delayed.addLastIf(this) { firstTask ->
                if (eventLoop.isCompleted) return SCHEDULE_COMPLETED // non-local return from scheduleTask
                if (firstTask == null) {
                    delayed.timeNow = now
                } else {
          
                    if (minTime - delayed.timeNow > 0) delayed.timeNow = minTime
                }
      
                if (nanoTime - delayed.timeNow < 0) nanoTime = delayed.timeNow
                true
            }
            return SCHEDULE_OK
        }

scheduleTask的作用将对象DelayedTask添加到队列中,并返回SCHEDULE_OK,回到schedule源码中

    public fun schedule(now: Long, delayedTask: DelayedTask) {
        when (scheduleImpl(now, delayedTask)) {
            //上面代码分析此时实际回到的是这里
            SCHEDULE_OK -> if (shouldUnpark(delayedTask)) unpark()
        }
    }
    protected actual fun unpark() {
        val thread = thread // atomic read
        if (Thread.currentThread() !== thread)
            unpark(thread)
    }
//thread实际是DefaultExector的
    override val thread: Thread
        get() = _thread ?: createThreadSync()
    @Synchronized
    private fun createThreadSync(): Thread {
        return _thread ?: Thread(this, THREAD_NAME).apply {
            _thread = this
            isDaemon = true
            start()
        }
    }
  const val THREAD_NAME = "kotlinx.coroutines.DefaultExecutor"

upark实际最终走到线程的unpark源码

@InlineOnly
internal inline fun unpark(thread: Thread) {
    timeSource?.unpark(thread) ?: LockSupport.unpark(thread)
}

也就是说最终实际调用LockSupport.unpark去唤醒线程,而我们又知道Thread实际是DefaultExecutor,而DefaultExecutor实现了Runnable接口

    override fun run() {
        ThreadLocalEventLoop.setEventLoop(this)
        registerTimeLoopThread()
        try {
            var shutdownNanos = Long.MAX_VALUE
            if (!notifyStartup()) return
            while (true) {
                //是否可被打断
                Thread.interrupted() 
                var parkNanos = processNextEvent()
                if (parkNanos > 0) {
                 
                    if (isShutdownRequested) return
                //实际走到的是线程的LockSupport. parkNanos
                    parkNanos(this, parkNanos)
                }
            }
        } finally {
            _thread = null // this thread is dead
            acknowledgeShutdownIfNeeded()
            unregisterTimeLoopThread()
            // recheck if queues are empty after _thread reference was set to null (!!!)
            if (!isEmpty) thread // recreate thread if it is needed
        }
    }

实际是就是LockSupport的park和unpack,回到scheduleResumeAfterDelay的DelayedResumeTask的run方法

    private inner class DelayedResumeTask(
        nanoTime: Long,
        private val cont: CancellableContinuation<Unit>
    ) : DelayedTask(nanoTime) {
     //cont是我们的回调
        override fun run() { with(cont) { resumeUndispatched(Unit) } }
        override fun toString(): String = super.toString() + cont.toString()
    }
    override fun CoroutineDispatcher.resumeUndispatched(value: T) {
        val dc = delegate as? DispatchedContinuation
        resumeImpl(value, if (dc?.dispatcher === this) MODE_UNDISPATCHED else resumeMode)
    }
    private fun resumeImpl(proposedUpdate: Any?, resumeMode: Int): CancelledContinuation? {
        _state.loop { state ->
            when (state) {
                is NotCompleted -> {
                    if (!_state.compareAndSet(state, proposedUpdate)) return@loop
                    dispatchResume(resumeMode)
                    return null
                }
           
        }
    }
    private fun dispatchResume(mode: Int) {
        if (tryResume()) return // completed before getResult invocation -- bail out
        dispatch(mode)
    }
internal fun <T> DispatchedTask<T>.dispatch(mode: Int = MODE_CANCELLABLE) {
    val delegate = this.delegate
    if (mode.isDispatchedMode && delegate is DispatchedContinuation<*> && mode.isCancellableMode == resumeMode.isCancellableMode) {
    //......
    } else {
        resume(delegate, mode)
    }
}
 delegate.resumeMode(getSuccessfulResult(state), useMode)
internal fun <T> Continuation<T>.resumeMode(value: T, mode: Int) {
    when (mode) {
        MODE_ATOMIC_DEFAULT -> resume(value)
        MODE_CANCELLABLE -> resumeCancellable(value)
        MODE_DIRECT -> resumeDirect(value)
        MODE_UNDISPATCHED -> (this as DispatchedContinuation).resumeUndispatched(value)
        MODE_IGNORE -> {}
        else -> error("Invalid mode $mode")
    }
}
public inline fun <T> Continuation<T>.resume(value: T): Unit =
    resumeWith(Result.success(value))
public interface Continuation<in T> {
    /**
     * The context of the coroutine that corresponds to this continuation.
     */
    public val context: CoroutineContext

    /**
     * Resumes the execution of the corresponding coroutine passing a successful or failed [result] as the
     * return value of the last suspension point.
     */
    public fun resumeWith(result: Result<T>)
}

实际会走到Continuation.resumeWith方法,而它的实现类实际是BaseContinuationImpl的.resumeWith方法

internal abstract class BaseContinuationImpl(
    // This is `public val` so that it is private on JVM and cannot be modified by untrusted code, yet
    // it has a public getter (since even untrusted code is allowed to inspect its call stack).
    public val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable {
    // This implementation is final. This fact is used to unroll resumeWith recursion.
    public final override fun resumeWith(result: Result<Any?>) {
        var current = this
        var param = result
        while (true) {
            probeCoroutineResumed(current)
            with(current) {
                val completion = completion!! // fail fast when trying to resume continuation without completion
                val outcome: Result<Any?> =
                    try {
                     //走到我们实现类的ContinuationImpl的invokeSuspend
                        val outcome = invokeSuspend(param)
                        if (outcome === COROUTINE_SUSPENDED) return
                        Result.success(outcome)
                    } catch (exception: Throwable) {
                        Result.failure(exception)
                    }
                releaseIntercepted() // this state machine instance is terminatin
            }
        }
    }

总结:
1、suspend实际是编译后添加一个Continuation的回掉
2、内部会创建一个ContinuationImpl方法并实现invokeSuspend方法
3、挂起实际是return了
4、恢复最终实际会调用ContinuationImpl. invokeSuspend方法,之后再次调用自己创建的类

恢复原理逆向还原

ublic class Test {

    public static final Object A(Continuation continuation) {
        ContinuationImpl callback;
        if(!(continuation instanceof ContinuationImpl)&&(((ContinuationImpl) continuation).lable==0)){
            callback = new ContinuationImpl(continuation) {
                @Override
                Object invokeSuspend(@NotNull Object result) {
                    this.result = result;
                    this.lable |= Integer.MIN_VALUE;
                    return A(this);
                }
            };
        }else{
            callback= (ContinuationImpl) continuation ;
        }
        switch (callback.lable){
            case 0:
                Object delay = DelayKt.delay(2000, callback);
                if(delay== IntrinsicsKt.getCOROUTINE_SUSPENDED()){
                    return IntrinsicsKt.getCOROUTINE_SUSPENDED();
                }
                break;
        }
        String var1 = "delay after";
        System.out.println(var1);
        return "I am peakmain";
    }

    abstract static class ContinuationImpl<T> implements Continuation<T> {
        private Continuation mCallback;
        int lable;
        Object result;

        public ContinuationImpl(Continuation callback) {

            mCallback = callback;
        }

        @NotNull
        @Override
        public CoroutineContext getContext() {
            return mCallback.getContext();
        }

        @Override
        public void resumeWith(@NotNull Object result) {
            Object suspend = invokeSuspend(result);
            if(suspend==IntrinsicsKt.getCOROUTINE_SUSPENDED())return;
            mCallback.resumeWith(result);

        }

        abstract Object invokeSuspend(@NotNull Object result);
    }
}

协程的挂起恢复本质是return+callback

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,271评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,275评论 2 380
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,151评论 0 336
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,550评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,553评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,559评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,924评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,580评论 0 257
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,826评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,578评论 2 320
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,661评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,363评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,940评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,926评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,156评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,872评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,391评论 2 342

推荐阅读更多精彩内容