1、Channel实际上是一个并发安全的队列,它可以用来连接协程,实现不同协程的
通信。channel默认容量1。
fun ceshiChannel() = runBlocking<Unit> {
//生产者
val channel = Channel<Int>()
val producer = GlobalScope.launch {
var i = 0
while (true) {
delay(1000)
channel.send(++i)
println("send $i")
}
}
//消费者
val consumer = GlobalScope.launch {
while (true) {
val element=channel.receive()
println("receive $element")
}
}
joinAll(producer,consumer)
}
②迭代Channel
Channel本身确实像序列,所以我们在读取的时候可以直接获取一个Channel的iterator.
fun ceshiChannel2() = runBlocking<Unit> {
//生产者
val channel = Channel<Int>()
val producer = GlobalScope.launch {
for (x in 1..5) {
channel.send(x)
println("send $x")
}
}
//消费者
val consumer = GlobalScope.launch {
val iterator =channel.iterator()
while (iterator.hasNext()) {
val element=channel.receive()
println("receive $element")
delay(2000)
}
//也或者可以以下写法
for(element in channel){
println("receive $element")
delay(2000)
}
}
joinAll(producer,consumer)
}
③、produce与actor 构造生产者与消费者的便捷方法
我们可以通过produce方法启动一个生产者协程,并返回一个ReceiveChannel,其他协程就可以用这个Channel来接收数据了。反过来,我们可以用actor启动一个消费者协程
fun ceshiChannel3() = runBlocking<Unit> {
//生产者
val receiveChannel:ReceiveChannel<Int> = GlobalScope.produce {
repeat(100){
delay(1000)
send(it)
}
}
//消费者
val consumer = GlobalScope.launch {
for(element in receiveChannel){
println("receive $element")
}
}
joinAll(consumer)
}
fun ceshiChannel4() = runBlocking<Unit> {
//消费者
val sendChannel:SendChannel<Int> = GlobalScope.actor {
while (true){
val element=receive()
println(element)
}
}
//生产者
val producer = GlobalScope.launch {
for (x in 1..5) {
sendChannel.send(x)
}
}
producer.join()
}
④、Channel的关闭
produce和actor返回的Channel都会随着对应的协程执行完毕而关闭,也正是这样,Channel才被称为热数据流。
对于一个Channel,如果我们调用了它的close方法,它会立即停止接收新元素,也就是说这时它的isClosedforSend会立即返回true。而由于Channel缓冲区的存在,这时候可能还有一些元素没有被处理完,因此要等所有的元素都被读取之后isClosedForReceive才会返回true。
Channel的生命周期最好由主导方来维护,建该由主导的一方实现关闭。
⑤、BroadcastChannel
前面提到,发送端和接收端在Channel中存在一对多的情形,从数据处理本身来讲,虽然有多个接收端,但是同一个元素只会被一个接收端读到。广播则不然多个接收端不存在互斥行为。
fun ceshiChannel5() = runBlocking<Unit> {
//消费者 也可以通过下面的broadcast进行转换
val broadcastChannel= BroadcastChannel<Int>(Channel.BUFFERED)
// val channel = Channel<Int>()
// val broadcastChannel=channel.broadcast(3)
val producer = GlobalScope.launch {
List(3){
delay(1000)
broadcastChannel.send(it)
}
broadcastChannel.close()
}
//消费者
List(3){index->
GlobalScope.launch {
val receiveChannel=broadcastChannel.openSubscription()
for (x in receiveChannel) {
println(x)
}
}
}.joinAll()
}
⑥、await多路复用
两个API分别从网络和本地缓存获取数据,期望哪个先返回就先用哪个做展示。
⑦、多个Channel复用
跟await类似,会接收到最快的那个channel消息。
2、Flow
①、Flow异步流
Flow最主要的作用在于异步返回多个值,文件下载就是Flow最经典的一个应用场景。
冷流还是热流
Flow 是冷流,什么是冷流?简单来说,如果 Flow 有了订阅者 Collector 以后,发射出来的值才会实实在在的存在于内存之中,这跟加载的概念很像。
与之相对的是热流,StateFlow 和 SharedFlow 是热流,在垃圾回收之前,都是存在内存之中,并且处于活跃状态的。
②、StateFlow是一个状态容器式可观察数据流,StateFlow是可以向其收集器发出当前状态更新和新状态更新。还可通过其value属性读取当前状态值。
③、SharedFlow 会向从其中收集值的所有使用方发出数据。