Kotlin协程之通道-多路复用-并发安全

认识通道
  • Channel实际上是一个并发安全的队列,它可以用来连接协程,实现不同协程的通信。
fun testKnowChannel() = runBlocking {
    val channel = Channel<Int>() //默认通道缓存容量为 0
        //生产者
        val producer = GlobalScope.launch {
            var i = 0
            while(true){
                delay(1000)
                channel.send(++i)
                println("send $i")
            }
        }
    // 消费者
    val consumer = GlobalScope.launch {
        while(true){
//            delay(2000)
            val element = channel.receive()
            println("receive $element")
        }
    }
    joinAll(producer,consumer)
}

通道缓存是0,会发送一个消费一个,如果消费的比较慢,发送方会将send挂起等到消费完了再继续

Channel的容量
  • Channel实际上就是一个队列,队列中一定存在缓冲区,那么一旦这个缓冲区满了,并且也一直没有调用receive并取走函数,send就需要挂起,故意让接受端的节奏放慢,发现send总是会挂起,知道receive之后才继续往下执行
迭代Channel
  • Channel本身确实像序列,所以我们在读取的时候可以直接取一个Channel的iterator
fun testIterateChannel() = runBlocking {
    val channel = Channel<Int>(Channel.UNLIMITED)
    //生产者
    val producer = GlobalScope.launch {
       for(i in 1..5){
           channel.send(i * i)
           println("send ${i * i}")
       }
    }
    // 消费者
    val consumer = GlobalScope.launch {
//        val iterator = channel.iterator()
//        while(iterator.hasNext()){
//            val element= iterator.hasNext()
//            println("receive $element")
//            delay(2000)
//        }
        for(element in channel) {
            println("receive $element")
            delay(2000)
        }
    }
    joinAll(producer,consumer)
}

设置通道大小后会将5个结果一下子发送出来,消费者会慢慢进行消费

produce与actor
  • 构造生产者与消费者的便捷方法
  • 我们可以通过produce方法启动一个生产者协程,并返回一个ReceiveChannel,其他协程可以用这个Channel来接受数据了,反过来,我们可以用actor启动一个消费者协程
fun testProduceActorChannel() = runBlocking {
//    val receiveChannel = GlobalScope.produce { //生产者
//        repeat(100){
//            delay(1000)
//            send(it)
//        }
//    }
//    val consumer = GlobalScope.launch { // 消费者
//        for(i in receiveChannel) {
//            println("receive $i")
//        }
//    }
//    consumer.join()

    val sendChannel = GlobalScope.actor<Int> { // 消费者
        while(true){
            val element = receive()
            println("receive $element")
        }
    }

    val produce = GlobalScope.launch { // 生产者
        for(i in 0..3) {
            sendChannel.send(i)
        }
    }
    produce.join()

}
Channel的关闭
  • produce和actor返回Channel都会随着对应的协程执行完毕而关闭,也是这样,Channel才被称为热数据流
  • 对于一个Channel,如果我们调用了它的close,它会立即停止接受新元素,也就是说它的isCloseForSend会立即返回true。而由于Channel缓冲区的存在,这时候可能还有一些元素没有被处理完,因此要等所有元素都被读取后isClosedForReceive才会返回true
  • Channel的生命周期最好由主导仿来维护,建议由主导的一方实现关闭
fun testCloseChannel() = runBlocking {
    val channel = Channel<Int>(3)

    val produce = GlobalScope.launch { // 生产者
        List(3) {
            channel.send(it)
            println("send $it")
        }
        channel.close()
        println("""
               -CloseForSend:${channel.isClosedForSend} -CloseForReceive:${channel.isClosedForReceive}
            """.trimIndent())
    }

    val consumer = GlobalScope.launch { // 消费者
        for(element in channel) {
            println("receive: $element")
        }
        println("""
               -CloseForSend:${channel.isClosedForSend} -CloseForReceive:${channel.isClosedForReceive}
            """.trimIndent())
    }
    joinAll(produce,consumer)
}
BroadcaseChannel
  • 发送端和接收端在Channel中存在一对多的情况,从数据本身来讲,虽然有多个接受端,但是同一个元素只会被一个接收端读到。广播则不然,多个接收端不存在互斥的行为。
fun testBroadcastChannel() = runBlocking {
    val broadcast = BroadcastChannel<Int>(Channel.BUFFERED)
//    val channel = Channel<Int>()
//    val broadcast = channel.broadcast(10)// 可以进行转换
    val produce = GlobalScope.launch { // 生产者
        List(3) {
            delay(100)
            broadcast.send(it)
            println("send $it")
        }
        broadcast.close()
    }
    List(3){
        GlobalScope.launch { // 消费者
            val receiveChannel = broadcast.openSubscription()
            for(element in receiveChannel) {
                println("[$it] receive: $element")
            }
        }
    }.joinAll()
}
多路复用
  • 数据通信系统或者计算机网络系统中,传输媒体的带宽或容量往往会大于传输单一信号的需求,为了有效地利用通信线路,希望一个信道同时传输多路信号,这就是所谓的多路复用技术(Multiplexing)
复用多个await
  • 两个API分别从网络和本地缓存中获取数据期望哪个先返回就先用哪个做展示
fun CoroutineScope.getUserFromLocal() = async(Dispatchers.IO) {
    // 模拟读取本地数据
    delay(1000)
    "getUserFromLocal"
}

fun CoroutineScope.getUserFromNetwork() = async(Dispatchers.IO) {
    // 模拟读取网络数据
    delay(500)
    "getUserFromNetwork"
}

fun testSelectAwait() = runBlocking {
    GlobalScope.launch {
        val userFromLocal = getUserFromLocal()
        val userFromNetwork = getUserFromNetwork()
        val select = select<String> {
            userFromLocal.onAwait { it }
            userFromNetwork.onAwait { it }
        }
        println(select)
    }.join()
}
复用多个Channel
  • 跟await类似,会接收到最快的那个Channel消息
fun testSelectChannel() = runBlocking {
    val channels = listOf(Channel<Int>(), Channel<Int>())
    GlobalScope.launch {
        delay(100)
        channels[0].send(200)
    }
    GlobalScope.launch {
        delay(50)
        channels[1].send(300)
    }
    val result = select<Int> {
        channels.forEach {
            it.onReceive { it }
        }
    }
    println(result)
    delay(1000)
}
SelectCause
  • 所有能够被select的事件都是SelectClauseN类型
  1. SelectCause0:对应事件没有返回值,例如join,那么onJoin就是SelectCauseN,使用时,onJoin的参数是一个无参函数
  2. SelectCause1:对应事件有返回值,例如onAwait,onReceive
  3. SelectCause2:对应事件有返回值,此外还要一个额外参数,例如Channel.onSend,一个参数为Channel数据类型的值,一个为发送成功时的回调
  • 如果我们想要确认挂起函数是否支持select,只需要查看其是否存在对应的SelectClauseN类型可回调即可
fun testChannelSendSelect() = runBlocking {
    val channels = listOf(Channel<Int>(), Channel<Int>())
    launch {
        select<Unit?> {
            launch {
                delay(100)
                channels[0].onSend(0) {
                    println("onSend 0")
                }
            }

            launch {
                delay(50)
                channels[1].onSend(1) {
                    println("onSend 1")
                }
            }
        }
    }

    GlobalScope.launch {
        println(channels[0].receive())
    }

    GlobalScope.launch {
        println(channels[1].receive())
    }
    
}
使用Flow实现多路复用
fun testFlowMerge() = runBlocking {
    listOf(::getInfoForLocal1, ::getInfoForLocal2)
        .map { function ->
            function.call()
        }.map { deferred ->
            flow { emit(deferred.await()) }
        }.merge()
        .collect {
            println(it)
        }
}
并发安全
  • 在Java平台上的kotlin协程实现避免不了并发调度的问题,因此线程安全值得留意
fun `testSyncSafe1`() = runBlocking {
    var count = 0;
    List(1000) {
        GlobalScope.launch { count++ }
    }.joinAll()

    println(count)
}

以上输出的结果肯定都是小于1000的,不是原子性的操作是线程不安全的

  • Java是提供的线程安全类
fun `test sync safe2`() = runBlocking {
    var count = AtomicInteger(0);
    List(1000) {
        GlobalScope.launch { count.incrementAndGet() }
    }

    println(count.get())
}
协程的并发工具
  • 除了我们在线程中常用的解决并发问题外,协程框架也提供了一些并发安全的工具
  1. Channel:并发安全的消息通道
  2. Mutex:轻量级锁,它的lock和unlock从语义上与线程锁比较类似,之所以轻量是因为它在获取不到锁时不会阻塞线程,而是挂起等待锁的释放
fun testSyncMutex() = runBlocking {
    var count = 0;
    val mutex = Mutex()
    List(1000) {
        GlobalScope.launch {
            mutex.withLock {
                count++
            }
        }
    }.joinAll()

    println(count)
}
  1. Semaphore:轻量级信号量,信号量可以有多个,协程在获取信号量后即可执行并发操作。当Semaphore的参数为1时,等价于Mutex
fun testSyncSemaphore() = runBlocking {
    var count = 0;
    val semaphore = Semaphore(1)
    List(1000) {
        GlobalScope.launch {
            semaphore.withPermit {
                count++
            }
        }
    }.joinAll()

    println(count)
}
  1. 我们也可以避免访问外部变量,基于参数作运算,通过返回值提供运算结果
fun testSyncAvoid`() = runBlocking {
    var count = 0;
    count += List(1000) {
        GlobalScope.async {
            1
        }
    }.map {
        it.await()
    }.sum()

    println(count)
}
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,133评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,682评论 3 390
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,784评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,508评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,603评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,607评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,604评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,359评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,805评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,121评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,280评论 1 344
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,959评论 5 339
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,588评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,206评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,442评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,193评论 2 367
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,144评论 2 352

推荐阅读更多精彩内容