安卓 协程

协程: 协程最核心的点就是,函数或者一段程序能够被挂起,稍后再在挂起的位置恢复。挂起的函数要在协程里面执行。
特点: 1、更加轻量级,占用资源更少。
2、避免“回调地狱”,增加代码可读性。
3、协程的挂起不阻塞线程。
1、协程的挂起与恢复
suspend- 挂起函数不会阻塞当前线程。当一个挂起函数被调用时,它会暂停协程的执行,但不会阻塞底层线程。这意味着即使你在主线程上调用挂起函数,也不会使主线程陷入等待状态。
resume -用于让已暂停的协程从其暂停处继续执行。
挂起函数只能在协程体内或其他挂起函数内调用

tv?.setOnClickListener {
            GlobalScope.launch(Dispatchers.Main) {
                //挂起,点击N次响应N次
                delay(4000)
                Log.i("jia", "-----")
            }
            //点击一次无效因为阻塞了, 点击N次也是响应一次而且会崩溃
            Thread.sleep(4000)
            Log.i("jia", "=====")
        }

阻塞: 等98号技师,98号在忙,中间不干其他的,就一直等直到她结束
挂起: 等98号技师,98号在忙,前台记下来,先看电影或者干其他的,结束后直接服务你
2、调度器
所有协程必须在调度器中运行,即使它们在主线程上运行也是如此。
Dispatchers.Main Android 上的主线程
用来处理UI交互和一些轻量级任务:调用suspend函数、调用UI函教、更新LiveData
Dispatchers.lO 非主线程
转为磁盘和网络IO进行优化,用来处理数据库、文件、读写网络处理
Dispatchers.Default 非主线程
专为CPU密集型任务进行了忧化:数组排序、JSON数据解析
3、协程作用域
定义协程必须指定其CoroutineScope,它会跟踪所有协程,同样它还可以取消由它所启动的所有协程
常用的相关API有:
GlobalScope,生命周期是process级别的,即使Activity或Fragment已经被销毁,协程仍然在执行。
MainScope,在Activity中使用,可以在onDestroy()中取消协程。默认运行在主线程。
viewModelScope,只能在ViewModel中使用,自动管理协程的生命周期,绑定ViewModel的生命周期。
lifecycleScope,只能在Activity、Fragment中使用,自动管理协程的生命周期,确保在生命周期结束时取消协程。会绑定Activity和Fragment的生命周期。
CoroutineScope:适用于需要手动管理协程生命周期的场景,可以在 Activity 或 Fragment 中使用,但需要手动取消协程。
SupervisorScope :子协程独立运行,适用于多个协程同时运行的场景。
4、协程的启动模式
DEFAULT:协程创建后,立即开始调度,在调度前如果协程被取消,其将直接进入取消响应的状态。
类似滴滴打车,立即开始调度不一定立马有车,需要等待车来,但是可以在车来之前取消。
ATOMIC:协程创建后,立即开始调度,协程执行到第一个挂起点之前不响应取消。
LAZY:只有协程被需要时,包括主动调用协程的start、join或者await等函数时才会开始调度,如果调度前就被取消,那么该协程将直接进入异常结束状态。join await 都是挂起函数,不会阻塞主线程。
UNDISPATCHED:协程创建后立即在当前函数调用栈中执行,直到遇到第一个真正挂起的点。

      //如何使用Dispatchers.IO调度器,执行仍然在主线程就是用Coroutinestart.UNDISPATCHED
        val job =GlobalScope.async(context=Dispatchers.IO,start=Coroutinestart.UNDISPATCHED){
            println("thread:"+Thread.currentThread().name)
        }

5、协程的作用域构建器 coroutineScope与runBlocking
runBlocking是常规函数,而coroutineScope是挂起函数。
它们都会等待其协程体以及所有子协程结束,主要区别在于runBlocking方法会阻塞当前线程来等待,而coroutineScope只是挂起,会释放底层线程用于其他用途。
coroutineScope与supervisorScope
coroutineScope:一个协程失败了,所有其他兄弟协程也会被取消。
supervisorScope:一个协程失败了,不会影响其他兄弟协程。

    suspend fun 将main() = runBlocking {
        coroutineScope { //这里换成supervisorScope,子协程1就会执行
            val job1 = launch {
                delay(5000)
                println("子协程1")
            }
            val job2 = launch {
                delay(1000)
                println("子协程2")
                throw Exception()   //子协程1不会执行
            }
        }
    }

6、Job对象
对于每一个创建的协程((通过launch或者async,会返回一个Job实例,该实例是协程的唯一标示,并且负责管理协程的生命周期。
一个任务可以包含一系列状态:新创建(New)、活跃(Active),完成中(Completing)、已完成(Completed),(取消中(Cancelling)和已取消(Cancelled)虽然我们无法直接访问这些状态,但是我们可以访问Job的属性: isActive、isCancelled和iscompleted。
job.join
会挂起调用该函数的协程,直到目标协程执行完毕。这样可以确保当前协程不会继续执行后续的代码,直到与Job关联的协程已经结束。join函数是非阻塞的,即使它会挂起当前协程,它不会阻塞底层线程。因此,它在并发编程中非常有用,可以用来控制协程的执行顺序,确保某些任务在其他任务之前完成。
job.cancel() 的作用是请求取消与该 Job 对象关联的协程,但是不会立马生效 。
具体来说,当你调用 job.cancel() 时,会向协程发送一个取消信号,请求它停止执行。然而,这并不意味着协程会立即终止,因为协程的取消是协作性的。换句话说,协程必须在合适的时机检查取消状态并进行响应。通常协程会通过挂起函数(如 delay 或 yield)来检测是否被取消,并在取消时抛出一个 CancellationException。
7、CPU密集型任务取消
isActive是一个可以被使用在CoroutineScope中的扩展属性,检查Job是否处于活跃状态。
ensureActive(),如果job处于非活跃状态,这个方法会立即抛出异常。
yield函数会检查所在协程的状态,如果已经取消,则抛出CancellationException予以响应。此外,它还会尝试出让线程的执行权,给其他协程提供执行机会。
处于取消中状态的协程不能够挂起(运行不能取消的代码),当协程被取消后需要调用挂起函数,我们需要将清理任务的代码放置于 NonCancellable CoroutineContext 中

  fun yield() = runBlocking<Unit> {
        val startTime = System.currentTimeMillis()
        val job = launch(Dispatchers.Default) {
            var nextPrintTime = startTime
            var i = 0
            while (i < 5&& isActive) {//增加isActive状态 来取消任务
                if (System.currentTimeMillis() >= nextPrintTime) {
//                    ensureActive()//下面这两种方式都可以取消任务
//                    yield()
                    println("job:I'm sleeping ${i++}。。。1")
                    nextPrintTime += 500
                }
            }
            try {
                delay(10000)
                println("job:I'm sleeping")
            }finally {
                println("job:I'm finally")
                withContext(NonCancellable){
                    //NonCancellable使用后,即使上述任务取消了,也不影响下面的执行
                    delay(1300)
                    println("job:I'm back")
                }
            }
        }
        delay(1300)
        println("main:I'm tired of waiting!")
        job.cancelAndJoin()
        println("main:Now Ican guit.")
    }

8、超时任务
很多情况下取消一个协程的理由是它有可能超时。
withTimeoutOrNull 通过返回null来进行超时操作,从而替代抛出一个异常。
9、SupervisorJob
使用SupervisorJob时,一个子协程的运行失败不会影响到其他子协程。Supervisorjob不会传播异常给它的父级,它会让子协程自己处理异常。
这种需求常见于在作用域内定义作业的UI组件,如果任何一个U的子作业执行失败了,它并不总是有必要取消整个UI组件,但是如果UI组件被销毁了,由于它的结果不再被需要了,它就有必要使所有的子作业执行失败。
supervisorScope:当作业自身执行失败的时候,所有子作业都会被取消。
10、异常捕获
使用CoroutineExceptionHandler对协程的异常进行捕获。以下的条件被满足时,异常就会被捕获:。
时机:异常是被自动抛出异常的协程所抛出的(使用launch,而不是 async 时);
位置:在CoroutineScope的CoroutineContext中或在一个根协程(CoroutineScope 或者 supervisorScope 的直接子协程)中。
当子协程被取消时,不会取消它的父协程,
如果一个协程遇到了CancellationException以外的异常,它将使用该异常取消它的父协程。当父协程的所有子协程都结束后,异常才会被父协程处理。

 fun catchHandle() = runBlocking<Unit> {
        val handler= CoroutineExceptionHandler { coroutineContext, exception ->
           println("catch"+exception)
        }
        val job=GlobalScope.launch(handler) {
            throw AssertionError() //会被捕获
            launch(handler) {
                throw AssertionError() //此内部协程的方式不会被捕获
            }
        }
        val job2=GlobalScope.async(handler)  {
            throw ArithmeticException()//不会被捕获
        }
        job.join()
        job2.await()
    }

常见异常捕获方式

val handler= CoroutineExceptionHandler { coroutineContext, exception ->
            println("catch"+exception)
        }
        tv?.setOnClickListener {
            GlobalScope.launch(handler) {
                Log.i("jia", "onclick")
                "abc".substring(10)//必闪退 ,但是加了异常捕获,就不会闪退了,还会打印闪退信息
            }
        }

11、Flow 与其他方式的区别
名为flow的Flow类型构建器函数。
flow{.. }构建块中的代码可以挂起。
函数simpleFlow不再标有suspend修饰符。
流使用emit函数发射值。
流使用collect函数收集值。

//返回多个值,是异步的
fun simpleFlow() = flow<Int> {
    for (i in 1..3) {
        delay(1000)//假装在一些重要的事情
        emit(i)//发射,产生一个元素
    }
}
fun collectFlow()= runBlocking<Unit> {
    simpleFlow().collect { value ->
        println(value)
    }
}

12、冷流
冷流:顾名思义冷启动,临阵磨枪
①、Flow是一种类似于序列的冷流,flow构建器中的代码直到流被收集的时候才运行。

 fun collectFlow()= runBlocking<Unit> {
        val flow =simpleFlow()
        printIn("Calling collect...")
        flow.collect{value ->printIn(value)}
        println("Calling collect again...")
        flow.collect{value ->printIn(value)}
    }

②、流的每次单独收集都是按顺序执行的,除非使用特殊操作符.
从上游到下游每个过渡操作符都会处理每个发射出的值,然后再交给末端操作符.
下列打印结果:
Collect string 2
Collect string 4

    fun collectFlow() = runBlocking<Unit> {
        //过滤偶数并将其映射到字符串
        (1..5).asFlow().filter {
            it % 2 == 0
        }.map {
            "string $it"
        }.collect {
            printIn("Collect $it")
        }
    }

③、flowOf构建器定义了一个发射固定值集的流。
使用.asFlow()扩展函数,可以将各种集合与序列转换为流。

 fun collectFlow() = runBlocking<Unit> {
        flowOf("one", "two", "three")
            .onEach { delay(1000) }
            .collect { value -> printIn(value) }

        (1..3).asFlow().collect { value ->
            println(value)
        }
    }

④、流的收集总是在调用协程的上下文中发生,流的该属性称为上下文保存。
flow.{...}构建器中的代码必须遵循上下文保存属性,并且不允许从其他上下文中发射(emit)。
flowOn操作符,该函数用于更改流发射的上下文。

   fun simpleFlow() = flow<Int> {
        for (i in 1..3) {
            delay(1000)//假装在一些重要的事情
            emit(i)//发射,产生一个元素
        }
    }.flowOn(Dispatchers.Default)

⑤、使用launchIn替换collect我们可以在单独的协程中启动流的收集。
onEach 它允许你在 Flow 的每个元素被收集时执行一个副作用(side effect)。简单来说,它可以在每个元素通过数据流时对其应用某种处理,但不会改变这些元素本身或影响流的正常传播。
onEach 提供了一种优雅的方式来为 Flow 添加额外的行为,而不必直接修改流的内容或其传播方式。

//事件源
    fun events() = (1..3)
        .asFlow()
        .onEach { delay(1000) }
        .flowOn(Dispatchers.Default)
    fun collectFlow5() = runBlocking<Unit> {
        events().onEach { event -> println("Event: $event ${Thread.currentThread().name})") }
//            .collect()
            .launchIn(CoroutineScope(Dispatchers.IO))
    }

⑥、为方便起见,流构建器对每个发射值执行附加的 ensureActive 检测以进行取消的这意味着从 flow{..}发出的繁忙循环是可以取消的。
出于性能原因,大多数其他流操作不会自行执行其他取消检测,在协程处于繁忙循环的情况下,必须明确检测是否取消。
通过cancellable操作符来执行此操作。
⑦、背压:背压(Backpressure)是指当生产者(数据生成方)生成数据的速度快于消费者(数据处理方)处理数据的速度时,系统需要采取的一种机制来处理这种不匹配的情况。背压机制允许生产者暂时停止生成数据,直到消费者能够跟上处理速度,从而避免数据丢失或系统崩溃。
buffer(),并发运行流中发射元素的代码。
conflate(),合并发射项,不对每个值进行处理。
collectLatest(),取消并重新发射最后一个值。
当必须更改CoroutineDispatcher时,flowOn操作符使用了相同的缓冲机制,但是buffer函数显式地请求缓冲而不改变执行上下文。

fun collectFlow6() = runBlocking<Unit> {
        simpleFlow()
            .buffer(50)//并发运行流中发射元素的代码。
            .conflate()//合并发射项,不对每个值进行处理。
            .collect { value ->
            delay(3000)
            println(value)
        }
    }

⑧、转换转换符

 suspend fun performRequest(request: Int): String {
        delay(1000)
        return "response $request"
    }

    fun simpleFlow() = flow<Int> {
        (1..3).asFlow()
            .map { performRequest(it) }
            .collect { value ->
                println(value)
            }
        (1..3).asFlow()
            .transform {
                emit("Making request $it")
                emit(performRequest(it))
            }
            .collect { value ->
                println(value)
            }
    }

限长操作符 take(2),表示最多接受两条数据。
组合操作符:就像Kotlin标准库中的Sequence,zip扩展函数一样,流拥有一个zip操作符用于组合两个流中的相关值。

fun collectFlow() = runBlocking<Unit> {
       val  nubs=(1..3).asFlow()
       val strs= flowOf("one","Two","Three")
        nubs.zip(strs){
            a,b->"$a ->$b"
        }.collect{
            println(it)
        }
    }

1、Channel实际上是一个并发安全的队列,它可以用来连接协程,实现不同协程的
通信。channel默认容量1。

fun ceshiChannel() = runBlocking<Unit> {
        //生产者
        val channel = Channel<Int>()
        val producer = GlobalScope.launch {
            var i = 0
            while (true) {
                delay(1000)
                channel.send(++i)
                println("send $i")
            }
        }
        //消费者
        val consumer = GlobalScope.launch {
            while (true) {
               val element=channel.receive()
                println("receive $element")
            }
        }
        joinAll(producer,consumer)
    }

②迭代Channel
Channel本身确实像序列,所以我们在读取的时候可以直接获取一个Channel的iterator.

fun ceshiChannel2() = runBlocking<Unit> {
        //生产者
        val channel = Channel<Int>()
        val producer = GlobalScope.launch {
            for (x in 1..5) {
                channel.send(x)
                println("send $x")
            }
        }
        //消费者
        val consumer = GlobalScope.launch {
            val iterator =channel.iterator()
            while (iterator.hasNext()) {
                val element=channel.receive()
                println("receive $element")
                delay(2000)
            }
            //也或者可以以下写法
            for(element in channel){
                println("receive $element")
                delay(2000)
            }
        }
        joinAll(producer,consumer)
    }

③、produce与actor 构造生产者与消费者的便捷方法
我们可以通过produce方法启动一个生产者协程,并返回一个ReceiveChannel,其他协程就可以用这个Channel来接收数据了。反过来,我们可以用actor启动一个消费者协程

fun ceshiChannel3() = runBlocking<Unit> {
        //生产者
        val receiveChannel:ReceiveChannel<Int> = GlobalScope.produce {
            repeat(100){
                delay(1000)
                send(it)
            }
        }
        //消费者
        val consumer = GlobalScope.launch {
            for(element in receiveChannel){
                println("receive $element")
            }
        }
        joinAll(consumer)
    }
    fun ceshiChannel4() = runBlocking<Unit> {
        //消费者
        val sendChannel:SendChannel<Int> = GlobalScope.actor {
            while (true){
                val element=receive()
                println(element)
            }
        }
        //生产者
        val producer = GlobalScope.launch {
            for (x in 1..5) {
                sendChannel.send(x)
            }
        }
        producer.join()
    }

④、Channel的关闭
produce和actor返回的Channel都会随着对应的协程执行完毕而关闭,也正是这样,Channel才被称为热数据流。
对于一个Channel,如果我们调用了它的close方法,它会立即停止接收新元素,也就是说这时它的isClosedforSend会立即返回true。而由于Channel缓冲区的存在,这时候可能还有一些元素没有被处理完,因此要等所有的元素都被读取之后isClosedForReceive才会返回true。
Channel的生命周期最好由主导方来维护,建该由主导的一方实现关闭。
⑤、BroadcastChannel
前面提到,发送端和接收端在Channel中存在一对多的情形,从数据处理本身来讲,虽然有多个接收端,但是同一个元素只会被一个接收端读到。广播则不然多个接收端不存在互斥行为。

  fun ceshiChannel5() = runBlocking<Unit> {
        //消费者 也可以通过下面的broadcast进行转换
        val broadcastChannel= BroadcastChannel<Int>(Channel.BUFFERED)
//        val channel = Channel<Int>()
//        val broadcastChannel=channel.broadcast(3)
        val producer = GlobalScope.launch {
            List(3){
                delay(1000)
                broadcastChannel.send(it)
            }
            broadcastChannel.close()
        }
        //消费者
        List(3){index->
            GlobalScope.launch {
                val receiveChannel=broadcastChannel.openSubscription()
                for (x in receiveChannel) {
                    println(x)
                }
            }
        }.joinAll()
    }

⑥、await多路复用
两个API分别从网络和本地缓存获取数据,期望哪个先返回就先用哪个做展示。
⑦、多个Channel复用
跟await类似,会接收到最快的那个channel消息。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,163评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,301评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,089评论 0 352
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,093评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,110评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,079评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,005评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,840评论 0 273
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,278评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,497评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,667评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,394评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,980评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,628评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,796评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,649评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,548评论 2 352

推荐阅读更多精彩内容

  • 1、Kotlin 协程(线程)切换[https://blog.csdn.net/ezconn/article/de...
    落寞1990阅读 95评论 0 0
  • 1. 什么是kotlin协程, 与线程的区别有哪些 Kotlin 中的协程是一种并发设计模式,它使得可以用顺序的方...
    vb12阅读 245评论 0 1
  • [TOC] 简介 Coroutines are computer program components that ...
    Whyn阅读 5,914评论 5 15
  • 概念 实际就是kotlin官方提供的线程API,相当于AsyncTask 特性:非阻塞挂起,可挂起/恢复执行 本质...
    Peakmain阅读 1,896评论 0 10
  • 目录 [toc] 1、协程是什么 如果我们去维基百科,可以找到一段类似的话: 协程是一种非抢占式或者说协作式的计算...
    denny_z阅读 1,204评论 0 5