kotlin--Channel、多路复用、并发安全

一、Channel

Channel相较于Flow,Flow是冷流,本质上可以说是一个单线程操作,只有开始收集时,上流代码才会启动,而Channel是一个并发安全的队列,可以用来连接不同的协程,实现不同协程之间的通信
1.Channel的使用

创建一个Channel对象,在不同协程中调用其send和receive函数

fun `test channel`() = runBlocking {
    val channel = Channel<Int>()
    var isFinished = false
    //生产者
    val producer = GlobalScope.launch {
        var i = 0
        while (i < 3) {
            delay(200)
            //发送产品
            channel.send(++i)
        }
        isFinished = true
    }

    //消费者
    val consumer = GlobalScope.launch {
        while (!isFinished) {
            //接收产品
            println(channel.receive())
        }
    }

    joinAll(producer, consumer)
}

结果:
1
2
3

2.Channel的容量

Channel的容量或者说缓冲区大小,默认为0,当消费者消费慢了,那么生产者会等待,反之生产者生产慢了,消费者会等待。如果想要指定缓冲区大小,可以在构建时传入

3.Channel迭代器

除了使用receive函数外,Channel还提供了迭代器用来接收数据

fun `test channel iterator`() = runBlocking {
    val channel = Channel<Int>(Channel.UNLIMITED)
    val producer = GlobalScope.launch {
        for (i in 0..5) {
            channel.send(i)
        }
    }

    val consumer = GlobalScope.launch { 
        val iterator = channel.iterator()
        while(iterator.hasNext()){
            val element = iterator.next()
            println(element)
        }
        
//        for(value in channel){
//            println(value)
//        }
    }
    
    joinAll(producer,consumer)
}
4.produce与actor

在协程中,可以使用produce启动一个生产者协程,并返回ReceiveChannel

fun `test channel produce`() = runBlocking {
    val receiveChannel = GlobalScope.produce {
        repeat(3){
            delay(100)
            send(it)
        }
    }
    
    val consumer = GlobalScope.launch { 
        for(i in receiveChannel){
            println(i)
        }
    }
    
    consumer.join()
}

反之使用actor启动一个消费者协程

fun `test channel acotr`() = runBlocking {
    val sendChannel = GlobalScope.actor<Int> {
        while (true) {
            delay(100)
            println(receive())
        }
    }

    val producer = GlobalScope.launch {
        for (i in 0..3) {
            sendChannel.send(i)
        }
    }

    producer.join()
}
5.Channel的关闭

produce和actor返回的channel都会随着对应协程执行结束后自动关闭
我们也可以使用close方法手动关闭,它会立即停止发送元素,此时isClosedForSend会立即返回true,而由于缓冲区的存在,所有元素读取完毕后,isClosedForReceive才会返回true

fun `test channel close`() = runBlocking { 
    val channel = Channel<Int>(3)
    val produce = GlobalScope.launch {
        for (i in 1..3) {
            channel.send(i)
        }

        channel.close()

        println(
            "produce isClosedForSend: ${channel.isClosedForSend}" +
                    " isClosedForReceive: ${channel.isClosedForReceive}"
        )
    }

    val consumer = GlobalScope.launch {
        for (i in channel) {
            delay(100)
            println(i)
        }

        println(
            "consumer isClosedForSend: ${channel.isClosedForSend}" +
                    "isClosedForReceive: ${channel.isClosedForReceive}"
        )
    }

    joinAll(produce, consumer)
}

结果:
produce isClosedForSend: true isClosedForReceive: false
1
2
3
consumer isClosedForSend: trueisClosedForReceive: true

6.BroadcastChannel

kotlin还提供了发送端一对多接收端的方式,使用BroadcastChannel需要指定其缓冲区大小,或使用Channel.BUFFERED。还可以使用channel对象的broadcast函数来获取BroadcastChannel对象

fun `test channel broadcast`() = runBlocking {
//    val broadcastChannel = BroadcastChannel<Int>(Channel.BUFFERED)
    val channel = Channel<Int>(3)
    val broadcastChannel = channel.broadcast()
    GlobalScope.launch {
        (0..3).forEach {
            broadcastChannel.send(it)
        }

        broadcastChannel.close()
    }

    List(3) { index ->
        GlobalScope.launch {
            val channel = broadcastChannel.openSubscription()
            for (i in channel) {
                println("index : $index  receive: $i")
            }
        }
    }.joinAll()
}

结果:
index : 1 receive: 0
index : 1 receive: 1
index : 1 receive: 2
index : 1 receive: 3
index : 0 receive: 0
index : 2 receive: 0
index : 2 receive: 1
index : 2 receive: 2
index : 2 receive: 3
index : 0 receive: 1
index : 0 receive: 2
index : 0 receive: 3

Process finished with exit code 0

二、多路复用

数据通信系统或计算机网络系统中,传输媒体的带宽或容量往往会大于传输单一信号的需求,为了有效利用通道线路,希望一个信道同时传输多路信号,这就是多路复用技术

1.多个await复用--select

获取数据可能有多个方法,每个方法的耗时可能不太一样,调用这些方法后,我们希望哪个方法先返回就使用它的返回值

data class Response<T>(var data: T, var random: Long)

fun CoroutineScope.getInfoForLocal1() = async {
    // 随机一个等待值
    val random = (0..201).shuffled().first().toLong()
    delay(random)
    Response("info for local1", random)
}

fun CoroutineScope.getInfoForLocal2() = async {
    // 随机一个等待值
    val random = (0..201).shuffled().first().toLong()
    delay(random)
    Response("info for local2", random)
}

fun `test await select`() = runBlocking {
    val local1 = getInfoForLocal1()
    val local2 = getInfoForLocal2()

    val select = select<Response<String>> {
        local1.onAwait { response ->
            response //可以改成其他类型的
        }
        local2.onAwait { response ->
            response
        }
    }

    println("$select")
}

结果:
Response(data=info for local1, random=56)

2.多个Channel复用

和await差不多

fun `test channel select`() = runBlocking {
    val channels = listOf(Channel<Int>(), Channel<Int>())

    GlobalScope.launch {
        delay(50)
        channels[0].send(0)
    }

    GlobalScope.launch {
        delay(100)
        channels[1].send(1)
    }

    val select = select<Int> {
        channels.forEach {
            it.onReceive { value ->
                value
            }
        }
    }

    println(select)
    delay(1000)
}

结果
0

3.SelectCause

并不是所有的事件可以使用select的,只有SelectCauseN类型的事件
1.SelectCause0:对应事件没有返回值,例如join,那么onJoin就是SelectCauseN,使用时,onJoin的参数是一个无参函数
2.SelectCause1:对应事件有返回值,例如onAwait,onReceive
3.SelectCause3:对应事件有返回值,此外还要一个额外参数,例如Channel.onSend,一个参数为Channel数据类型的值,一个为发送成功时的回调

SelectCause3:

fun `test channel send select`() = runBlocking {
    val channels = listOf(Channel<Int>(), Channel<Int>())
    launch {
        select<Unit?> {
            launch {
                delay(100)
                channels[0].onSend(0) {
                    println("onSend 0")
                }
            }

            launch {
                delay(50)
                channels[1].onSend(1) {
                    println("onSend 1")
                }
            }
        }
    }

    GlobalScope.launch {
        println(channels[0].receive())
    }

    GlobalScope.launch {
        println(channels[1].receive())
    }
    
}

结果:
1
onSend 1

4.使用Flow实现多路复用
fun `test flow merge`() = runBlocking {
    listOf(::getInfoForLocal1, ::getInfoForLocal2)
        .map { function ->
            function.call()
        }.map { deferred ->
            flow { emit(deferred.await()) }
        }.merge()
        .collect {
            println(it)
        }
}

结果:
Response(data=info for local1, random=129)
Response(data=info for local2, random=145)

和select不同的是,Flow收集时,会收集所有结果

三、并发安全

在Java平台上的kotlin协程实现避免不了并发调度的问题,因此线程安全值得留意

fun `test sync safe1`() = runBlocking {
    var count = 0;
    List(1000) {
        GlobalScope.launch { count++ }
    }.joinAll()

    println(count)
}

结果:
987

当然我们还可以使用Java是提供的线程安全类

fun `test sync safe2`() = runBlocking {
    var count = AtomicInteger(0);
    List(1000) {
        GlobalScope.launch { count.incrementAndGet() }
    }

    println(count.get())
}
协程框架也提供了解决并发问题的方法:
1.上面学习的Channel
2.Mutex:轻量级锁,用法和Java的锁类似,获取不到锁时,不会阻塞线程,而是挂起等待锁的释放
fun `test sync mutex`() = runBlocking {
    var count = 0;
    val mutex = Mutex()
    List(1000) {
        GlobalScope.launch {
            mutex.withLock {
                count++
            }
        }
    }.joinAll()

    println(count)
}
3.Semaphore,轻量级信号量,在linux中是进程间通讯的一种方式,协程获取到信号量后即可执行并发操作。当Semaphore为1时,效果等价于Mutex
fun `test sync semaphore`() = runBlocking {
    var count = 0;
    val semaphore = Semaphore(1)
    List(1000) {
        GlobalScope.launch {
            semaphore.withPermit {
                count++
            }
        }
    }.joinAll()

    println(count)
}
4.我们也可以避免访问外部变量,基于参数作运算,通过返回值提供运算结果
fun `test sync avoid`() = runBlocking {
    var count = 0;
    count += List(1000) {
        GlobalScope.async {
            1
        }
    }.map {
        it.await()
    }.sum()

    println(count)
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 218,386评论 6 506
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,142评论 3 394
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,704评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,702评论 1 294
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,716评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,573评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,314评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,230评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,680评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,873评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,991评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,706评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,329评论 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,910评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,038评论 1 270
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,158评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,941评论 2 355

推荐阅读更多精彩内容