android kotlin 协程(四) 协程间的通信
学完本篇你将会了解到:
- channel
- produce
- actor
- select
先来通过上一篇的简单案例回顾一下挂起于恢复:
fun main() {
val waitTime = measureTimeMillis {
runBlocking<Unit> {
println("main start") // 1 // 调度前
launch {
println("launch 1 start") // 2 // 调度后(执行前)
delay(1000) // 延迟1s (不会阻塞兄弟协程)
println("launch 1 end") // 3
}
println("main mid") // 4 // 调度前
launch {
println("launch 2 start") // 5 // 调度后执行
delay(500) // 延迟0.5s (不会阻塞兄弟协程)
println("launch 2 end") // 6
}
println("main end") // 7 // 调度前
}
}
println("等待时间:${waitTime}")
}
通过上一篇我们知道了在协程中,
是会先执行调度前的代码,然后会执行调度后的代码, 直到调度后的时候,才会真正的执行到协程体中
所以这段代码的执行顺序为:
1,4,7,2,5,6,3
launch{} 中的lambda表达式 是一个suspend 函数标记的,所以始终是异步的,并不会阻塞兄弟协程
所以等待时间 约等于 1000
这里为什么说是约等于呢? 因为创建协程等一系列操作会稍微耗时一点,直接取整即可!
Channel
send / receive
channel是用来协程之前通信的,例如现在有一个需求,B协程需要使用A协程中的某个值,那么就用到了channel
先来看个最简单的例子
可以看出,A协程可以完成发送,并且B协程也可以完成接受
如果说A协程是一个网络接口,会返回数据,此时B协程是否还会等待A协程数据返回呢?
可以看出,即使是A协程会延迟2s,那么B协程也会等待A协程返回
如果说,A协程现在有3条数据要发送,B协程是否会接受3条呢?
那么就要介绍 Channel()的第一个参数了:
- capacity 通道容量
channel 类似于一个阻塞队列(BlockingQueue), 默认是只缓存1条数据,只要不取,那么新的数据就无法加入到容器中
当send第二条数据的时候, 发现并没有receive() 来取第二条数据,所以就会出现一直挂起的效果
此时我们只需要让channel通道中容量变大,多存放几条数据即可
例如这样:
如果说,我们不想改变通道容量的大小,并且, 还要不让他挂起,那么就要介绍 channel的第二个参数了:
- onBufferOverflow
从名字也可以看出,这是缓冲区溢出策略,一共有三种状态
- BufferOverflow.SUSPEND: 挂起策略,当send不进去数据的时候,始终挂起,等待 receive() [默认]
- BufferOverflow.DROP_OLDEST: 当要溢出的时候,删除缓冲区中最旧的值
- BufferOverflow.DROP_LASTEST: 当要溢出的时候,删除缓冲区中最新的值
还是上面的例子,我们将容量设置为1, 往 channel中send 3条数据来看看效果
BufferOverflow.DROP_OLDEST | BufferOverflow.DROP_LASTEST |
---|---|
目前这些代码应该很好理解!
trySend / tryReceive
在新版的channel更新的api中,还增添了一系列 tryXXapi
来看一段代码:
-
trySend() 尝试向channel中发送数据。这个函数会立即返回一个结果,表明是否成功将元素发送到通道中。如果通道已满,它会立即返回一个
Failure
类型的结果,否则会返回一个Success
类型的结果。一般来说,生产者协程使用trySend
函数来尝试将数据发送到通道中,不会阻塞协程,同时可以通过返回结果来判断是否成功发送数据。 -
tryReceive() 这个函数会立即返回一个结果,表明是否成功从通道中接收到元素。如果通道已空,它会立即返回一个
Failure
类型的结果,否则会返回一个Success
类型的结果。一般来说,消费者协程使用tryReceive
函数来尝试从通道中接收数据,不会阻塞协程,同时可以通过返回结果来判断是否成功接收数据。
// TODO =================== trySend / tryReceive ======================
fun main() = runBlocking<Unit> {
// 用来协程间的通信
val channel = Channel<String>(capacity = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST)
println("main start")
launch { // A协程
// channel.close()
val trySend = channel.trySend("A协程发送数据 1")
if (trySend.isSuccess) {
println("channel 发送成功")
} else if (trySend.isClosed) {
println("channel 关闭了")
} else if (trySend.isFailure) {
println("channel 发送失败")
}
}.join() // A协程必须执行完,通道有数据了之后才能取
val tryReceive = channel.tryReceive()
if (tryReceive.isSuccess) {
println("tryReceive 接收到了数据:${tryReceive.getOrNull()}")
} else if (tryReceive.isClosed) {
println("tryReceive 关闭了")
} else if (tryReceive.isFailure) {
println("tryReceive 发送失败")
}
println("main end")
}
运行结果:
还有一些比较老的方法例如:
- offer / poll 等一些淘汰的方法就不说了,
onSend / onReceive
还有最后一种发送,获取数据的方式,这种方式是通过select 选择器来实现的,先来看代码
//// TODO =================== onSend / onReceive ======================
fun main() = runBlocking<Unit> {
// 用来协程间的通信
val channel = Channel<String>(capacity = 5, onBufferOverflow = BufferOverflow.SUSPEND)
println("main start")
launch { // A 协程 发送数据
channel.send("send发送数据 ")
channel.trySend("trySend发送数据")
}
// select 接收数据 默认会挂起,等待数据返回
select {
channel.onReceive {
println("onReceive:$it")
}
}
println("result ${channel.receive()}")
channel.invokeOnClose {
println("channel close ")
}
println("main end")
}
运行结果:
select作用不止这些,目前了解可以接受即可,下面会重点提到!
这里有一个小知识:
如果看到有这种Select开头的,基本都是要写到select{} 中才能使用
运行结果:
select 下面会提到,这里就不重点说了.
在实际开发中,对于我来说,channel用的还是比较少, 我感觉这玩意比较坑,一般情况下,要实现2个协程通信,我会采用flow
例如这样:
这篇重点不是flow,这里就不多说了!
produce / actor
produce
produce意为生产者, 其本质就是对协程和channel的一层封装,
它返回一个 ReceiveChannel
对象,这个对象可以用于在其他协程中消费 生产者协程产生的数据。
使用很简单
:
api还是调用的channel的,对我们来说只是省略了, new Channel() 的过程,这里就不多说了
actor
actor 与produce正好相反
actor本质也是对协程与channel的封装, 它会返回一个SendChannel
对象,这个对象用来给协程体发送数据
select
定义: 一旦某个挂起函数返回结果,select
结构就会立即返回该函数的结果,而其他仍在等待的挂起函数将会被取消。
注意点: select
只能用于挂起函数(即使用 suspend
修饰的函数)。另外,select
的选择器表达式中每个分支都应该返回相同类型的值,否则会编译报错。
简单的说就是, select 可以找到哪一个协程执行最快, 吧执行最快的结果返回,其他执行慢的,或者没有执行的协程全部关闭!
假设我们现在有一个实际的应用场景:
在实际开发中,我们需要请求接口, 请求接口的之前需要判断是否有缓存,
如果有缓存,就使用缓存数据
但是,如果请求接口比读取缓存数据还快,那么我们就用请求出来的数据
一般情况下缓存永远比请求数据快,这里就举个例子
select不仅可以监听 async的返回, 还有很多用处,例如可以监听协程是否执行完, 并且返回最快执行完的协程
来看看代码:
下篇预告:
- suspendCoroutine{}
- suspendCancellableCoroutine{}
- suspend 与 continuation与状态机器
- 不通过协程运行 suspend函数
原创不易,您的点赞就是对我最大的帮助!