一、Channel
Channel相较于Flow,Flow是冷流,本质上可以说是一个单线程操作,只有开始收集时,上流代码才会启动,而Channel是一个并发安全的队列,可以用来连接不同的协程,实现不同协程之间的通信
1.Channel的使用
创建一个Channel对象,在不同协程中调用其send和receive函数
fun `test channel`() = runBlocking {
val channel = Channel<Int>()
var isFinished = false
//生产者
val producer = GlobalScope.launch {
var i = 0
while (i < 3) {
delay(200)
//发送产品
channel.send(++i)
}
isFinished = true
}
//消费者
val consumer = GlobalScope.launch {
while (!isFinished) {
//接收产品
println(channel.receive())
}
}
joinAll(producer, consumer)
}
结果:
1
2
3
2.Channel的容量
Channel的容量或者说缓冲区大小,默认为0,当消费者消费慢了,那么生产者会等待,反之生产者生产慢了,消费者会等待。如果想要指定缓冲区大小,可以在构建时传入
3.Channel迭代器
除了使用receive函数外,Channel还提供了迭代器用来接收数据
fun `test channel iterator`() = runBlocking {
val channel = Channel<Int>(Channel.UNLIMITED)
val producer = GlobalScope.launch {
for (i in 0..5) {
channel.send(i)
}
}
val consumer = GlobalScope.launch {
val iterator = channel.iterator()
while(iterator.hasNext()){
val element = iterator.next()
println(element)
}
// for(value in channel){
// println(value)
// }
}
joinAll(producer,consumer)
}
4.produce与actor
在协程中,可以使用produce启动一个生产者协程,并返回ReceiveChannel
fun `test channel produce`() = runBlocking {
val receiveChannel = GlobalScope.produce {
repeat(3){
delay(100)
send(it)
}
}
val consumer = GlobalScope.launch {
for(i in receiveChannel){
println(i)
}
}
consumer.join()
}
反之使用actor启动一个消费者协程
fun `test channel acotr`() = runBlocking {
val sendChannel = GlobalScope.actor<Int> {
while (true) {
delay(100)
println(receive())
}
}
val producer = GlobalScope.launch {
for (i in 0..3) {
sendChannel.send(i)
}
}
producer.join()
}
5.Channel的关闭
produce和actor返回的channel都会随着对应协程执行结束后自动关闭
我们也可以使用close方法手动关闭,它会立即停止发送元素,此时isClosedForSend会立即返回true,而由于缓冲区的存在,所有元素读取完毕后,isClosedForReceive才会返回true
fun `test channel close`() = runBlocking {
val channel = Channel<Int>(3)
val produce = GlobalScope.launch {
for (i in 1..3) {
channel.send(i)
}
channel.close()
println(
"produce isClosedForSend: ${channel.isClosedForSend}" +
" isClosedForReceive: ${channel.isClosedForReceive}"
)
}
val consumer = GlobalScope.launch {
for (i in channel) {
delay(100)
println(i)
}
println(
"consumer isClosedForSend: ${channel.isClosedForSend}" +
"isClosedForReceive: ${channel.isClosedForReceive}"
)
}
joinAll(produce, consumer)
}
结果:
produce isClosedForSend: true isClosedForReceive: false
1
2
3
consumer isClosedForSend: trueisClosedForReceive: true
6.BroadcastChannel
kotlin还提供了发送端一对多接收端的方式,使用BroadcastChannel需要指定其缓冲区大小,或使用Channel.BUFFERED。还可以使用channel对象的broadcast函数来获取BroadcastChannel对象
fun `test channel broadcast`() = runBlocking {
// val broadcastChannel = BroadcastChannel<Int>(Channel.BUFFERED)
val channel = Channel<Int>(3)
val broadcastChannel = channel.broadcast()
GlobalScope.launch {
(0..3).forEach {
broadcastChannel.send(it)
}
broadcastChannel.close()
}
List(3) { index ->
GlobalScope.launch {
val channel = broadcastChannel.openSubscription()
for (i in channel) {
println("index : $index receive: $i")
}
}
}.joinAll()
}
结果:
index : 1 receive: 0
index : 1 receive: 1
index : 1 receive: 2
index : 1 receive: 3
index : 0 receive: 0
index : 2 receive: 0
index : 2 receive: 1
index : 2 receive: 2
index : 2 receive: 3
index : 0 receive: 1
index : 0 receive: 2
index : 0 receive: 3
Process finished with exit code 0
二、多路复用
数据通信系统或计算机网络系统中,传输媒体的带宽或容量往往会大于传输单一信号的需求,为了有效利用通道线路,希望一个信道同时传输多路信号,这就是多路复用技术
1.多个await复用--select
获取数据可能有多个方法,每个方法的耗时可能不太一样,调用这些方法后,我们希望哪个方法先返回就使用它的返回值
data class Response<T>(var data: T, var random: Long)
fun CoroutineScope.getInfoForLocal1() = async {
// 随机一个等待值
val random = (0..201).shuffled().first().toLong()
delay(random)
Response("info for local1", random)
}
fun CoroutineScope.getInfoForLocal2() = async {
// 随机一个等待值
val random = (0..201).shuffled().first().toLong()
delay(random)
Response("info for local2", random)
}
fun `test await select`() = runBlocking {
val local1 = getInfoForLocal1()
val local2 = getInfoForLocal2()
val select = select<Response<String>> {
local1.onAwait { response ->
response //可以改成其他类型的
}
local2.onAwait { response ->
response
}
}
println("$select")
}
结果:
Response(data=info for local1, random=56)
2.多个Channel复用
和await差不多
fun `test channel select`() = runBlocking {
val channels = listOf(Channel<Int>(), Channel<Int>())
GlobalScope.launch {
delay(50)
channels[0].send(0)
}
GlobalScope.launch {
delay(100)
channels[1].send(1)
}
val select = select<Int> {
channels.forEach {
it.onReceive { value ->
value
}
}
}
println(select)
delay(1000)
}
结果
0
3.SelectCause
并不是所有的事件可以使用select的,只有SelectCauseN类型的事件
1.SelectCause0:对应事件没有返回值,例如join,那么onJoin就是SelectCauseN,使用时,onJoin的参数是一个无参函数
2.SelectCause1:对应事件有返回值,例如onAwait,onReceive
3.SelectCause3:对应事件有返回值,此外还要一个额外参数,例如Channel.onSend,一个参数为Channel数据类型的值,一个为发送成功时的回调
SelectCause3:
fun `test channel send select`() = runBlocking {
val channels = listOf(Channel<Int>(), Channel<Int>())
launch {
select<Unit?> {
launch {
delay(100)
channels[0].onSend(0) {
println("onSend 0")
}
}
launch {
delay(50)
channels[1].onSend(1) {
println("onSend 1")
}
}
}
}
GlobalScope.launch {
println(channels[0].receive())
}
GlobalScope.launch {
println(channels[1].receive())
}
}
结果:
1
onSend 1
4.使用Flow实现多路复用
fun `test flow merge`() = runBlocking {
listOf(::getInfoForLocal1, ::getInfoForLocal2)
.map { function ->
function.call()
}.map { deferred ->
flow { emit(deferred.await()) }
}.merge()
.collect {
println(it)
}
}
结果:
Response(data=info for local1, random=129)
Response(data=info for local2, random=145)
和select不同的是,Flow收集时,会收集所有结果
三、并发安全
在Java平台上的kotlin协程实现避免不了并发调度的问题,因此线程安全值得留意
fun `test sync safe1`() = runBlocking {
var count = 0;
List(1000) {
GlobalScope.launch { count++ }
}.joinAll()
println(count)
}
结果:
987
当然我们还可以使用Java是提供的线程安全类
fun `test sync safe2`() = runBlocking {
var count = AtomicInteger(0);
List(1000) {
GlobalScope.launch { count.incrementAndGet() }
}
println(count.get())
}
协程框架也提供了解决并发问题的方法:
1.上面学习的Channel
2.Mutex:轻量级锁,用法和Java的锁类似,获取不到锁时,不会阻塞线程,而是挂起等待锁的释放
fun `test sync mutex`() = runBlocking {
var count = 0;
val mutex = Mutex()
List(1000) {
GlobalScope.launch {
mutex.withLock {
count++
}
}
}.joinAll()
println(count)
}
3.Semaphore,轻量级信号量,在linux中是进程间通讯的一种方式,协程获取到信号量后即可执行并发操作。当Semaphore为1时,效果等价于Mutex
fun `test sync semaphore`() = runBlocking {
var count = 0;
val semaphore = Semaphore(1)
List(1000) {
GlobalScope.launch {
semaphore.withPermit {
count++
}
}
}.joinAll()
println(count)
}
4.我们也可以避免访问外部变量,基于参数作运算,通过返回值提供运算结果
fun `test sync avoid`() = runBlocking {
var count = 0;
count += List(1000) {
GlobalScope.async {
1
}
}.map {
it.await()
}.sum()
println(count)
}