多线程通信往往是个很头疼的事情,Coroutine也不例外。多线程中有SynchronousQueue、
Exchanger等,Coroutine提供了Channel这个工具用于在coroutine之间分享数据。一个coroutine使用Channel发送数据,另一个coroutine可以使用这个Channel接收数据,如下图:
发送数据的coroutine被称为生产者,接收数据的被称为消费者。Channel并不限于一个生产者和一个消费者,可以同时有多个生产者和多个消费者。如下图:
当多个消费者接受同一个Channel的数据时,数据被这些消费者一个一个的处理(类似遍历)。自动处理意味着从Channel中移除这个数据。
Channel在作用下可以和集合做一个类比,数据放到集合尾部,然后其他地方接收数据。但是不同于任何一种集合,channel会在send
或者receive
的时候让coroutine处于suspned
状态。这通常发生在channel数据是空的时候(receive)或者满的时候(send)。
Channel实现了两个接口SendChannel
和ReceiveChannel
。很明显,生产者使用的是SendChannel
,消费者使用的是ReceiveChannel
。
interface SendChannel<in E> {
suspend fun send(element: E)
fun close(): Boolean
}
interface ReceiveChannel<out E> {
suspend fun receive(): E
}
interface Channel<E> : SendChannel<E>, ReceiveChannel<E>
生产者可以通过关闭channel来表示没有更多数据了。
Coroutine库中定义了一些channel。他们的不同点在于可以存储多少数据,send
方法调用的时候是否可以suspend
。对于所有的Channel实现类来说,receive
的行为都是一致的,如果channel中有数据接收数据,没有就处于suspend状态。
Channel类型
-
Unlimited channel
生产者可以无限制发送数据,直到OOM
-
Buffered channel
限制个数的channel,如果channel中的数据满了,生产者再发送数据的时候就会进入suspend状态直到channel中有空位。
-
“Rendezvous” channel
这个很像Java中的SynchronousQueue,一旦生产者发送数据后就会处于suspend状态直到用消费者取走这个数据。
-
Conflated channel
生产者产生的数据如果没有被消费,那么新生产的数据就会覆盖之前的数据。当然send
方法永远都不会suspned。
默认的Channel就是“Rendezvous” channel。