github原文地址
原创翻译,转载请保留或注明出处:https://www.jianshu.com/p/afd359a976e2
通道(Channels)
延迟值提供了一个在协程间传输单个值的便捷方式。Channel 提供了一种传输数据流的方式。
channel基础
Channel 是与 BlockingQueue
很相似的概念。一个关键的区别是它具有一个挂起 send 代替阻塞的put
操作,同时具有一个挂起 receive 代替阻塞的take
操作。
fun main(args: Array<String>) = runBlocking<Unit> {
val channel = Channel<Int>()
launch {
// this might be heavy CPU-consuming computation or async logic, we'll just send five squares
for (x in 1..5) channel.send(x * x)
}
// here we print five received integers:
repeat(5) { println(channel.receive()) }
println("Done!")
}
获取完整代码 here
输出结果:
1
4
9
16
25
Done!
关闭和channel迭代
与队列不同,可以通过关闭channel来指示出不再有元素到来。在接收端,通过常规的 for 循环来接收channel里的元素是很方便的。
从概念上来说,一个 close 调用就像往channel里发送一个特殊的关闭token。一旦受到此关闭token,迭代就会停止,这样可以保证收到关闭命令前,先前所有发送的元素都可以被接收到。
fun main(args: Array<String>) = runBlocking<Unit> {
val channel = Channel<Int>()
launch {
for (x in 1..5) channel.send(x * x)
channel.close() // we're done sending
}
// here we print received values using `for` loop (until the channel is closed)
for (y in channel) println(y)
println("Done!")
}
获取完整代码 here
构建channel生产者
协程生成一系列元素的模式很常见。这是“生产者-消费者模式”的一部分,这种模式经常出现在并发代码中。你可以将这样一个生产者抽象为一个用channel作为参数的函数,但这与常规意义上的“结果必须从函数返回”相违背。
在生产者端有一种便利的方式来做这件事:使用名为 produce 的协程构建器。而且消费者端可以用扩展函数 consumeEach 代替for循环。
fun produceSquares() = produce<Int> {
for (x in 1..5) send(x * x)
}
fun main(args: Array<String>) = runBlocking<Unit> {
val squares = produceSquares()
squares.consumeEach { println(it) }
println("Done!")
}
获取完整代码 here
流水线(Pipelines)
流水线(pipeline)是一种协程生成可能是无限的数据流的模式:
fun produceNumbers() = produce<Int> {
var x = 1
while (true) send(x++) // infinite stream of integers starting from 1
}
与此同时其他的协程消费这个流,做一些处理,并产生一些其他结果。在以下示例中数值只是被平方:
fun square(numbers: ReceiveChannel<Int>) = produce<Int> {
for (x in numbers) send(x * x)
}
主函数启动并连接整个pipeline:
fun main(args: Array<String>) = runBlocking<Unit> {
val numbers = produceNumbers() // produces integers from 1 and on
val squares = square(numbers) // squares integers
for (i in 1..5) println(squares.receive()) // print first five
println("Done!") // we are done
squares.cancel() // need to cancel these coroutines in a larger app
numbers.cancel()
}
获取完整代码 here
我们不必在此示例应用中取消这些协程,因为 coroutines are like daemon threads 协程就像守护线程,但是在一个更大型的应用中,如果我们不再需要它,那就需要停止我们的pipeline。又或者,我们可以将pipeline协程作为 children of a main coroutine 主协程的子协程运行,如下所示:
使用pipeline生成素数
让我们通过一个使用协程pipeline生成素数的例子将pipeline带到极端。我们从一个无限的数值序列开始,这次我们引入一个显示的 context
参数并将其传给 produce 构建器,所以调用者可以控制我们协程的运行位置:
fun numbersFrom(context: CoroutineContext, start: Int) = produce<Int>(context) {
var x = start
while (true) send(x++) // infinite stream of integers from start
}
以下的pipeline stage过滤传入的数值流,删除所有可被给定素数整除的数值:
fun filter(context: CoroutineContext, numbers: ReceiveChannel<Int>, prime: Int) = produce<Int>(context) {
for (x in numbers) if (x % prime != 0) send(x)
}
现在我们开始构建一个从2起的数值流pipeline,从当前channel获取素数,然后为每个找到的素数启动新的pipeline stage:
numbersFrom(2) -> filter(2) -> filter(3) -> filter(5) -> filter(7) ...
以下示例打印前十个素数,在主线程的上下文中运行整个pipeline。由于所有协程都是作为主 runBlocking 协程的子协程在其 coroutineContext 上下文中启动的,因为我们不必保留一份明确的清单记录我们所启动的所有协程。我们使用 cancelChildren 扩展函数来取消所有子协程。
fun main(args: Array<String>) = runBlocking<Unit> {
var cur = numbersFrom(coroutineContext, 2)
for (i in 1..10) {
val prime = cur.receive()
println(prime)
cur = filter(coroutineContext, cur, prime)
}
coroutineContext.cancelChildren() // cancel all children to let main finish
}
获取完整代码 here
这段代码输出如下:
2
3
5
7
11
13
17
19
23
29
注意一点,你可以使用标注库中的 buildIterator 协程构建器来构建相同的pipeline。用 buildIterator
替代 produce
,yield
替代 send
,next
替代receive
,Iterator
替代 ReceiveChannel
,并去除上下文。同时也不需要使用 runBlocking
。然而如上所示,pipeline使用channel的好处是,如果在 CommonPool 上下文中运行它,实际上可以使用多个cpu核心。
无论如何,这是一种极不切实际的寻找素数的方法。实际上,pipeline确实涉及一些其他的挂起调用(比如对远程服务的异步调用),并且这些pipeline不能使用 buildSeqeunce
/buildIterator
构建,因为它们不允许任意挂起。这与完全的异步的 produce
有所不同。
扇出(Fan-out)
多个协程可以从同一个channel接收数据,在它们之间分配工作。让我们从一个定时生产整型(每秒10个数)的生产者协程开始:
fun produceNumbers() = produce<Int> {
var x = 1 // start from 1
while (true) {
send(x++) // produce next
delay(100) // wait 0.1s
}
}
接着我们可以有几个处理协程。在这个示例中,它们只是打印它们的id和接收数值:
fun launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch {
for (msg in channel) {
println("Processor #$id received $msg")
}
}
现在我们启动5个处理器,让它们工作近乎1秒钟,看看会发生什么:
fun main(args: Array<String>) = runBlocking<Unit> {
val producer = produceNumbers()
repeat(5) { launchProcessor(it, producer) }
delay(950)
producer.cancel() // cancel producer coroutine and thus kill them all
}
获取完整代码 here
输出会和下面这个很类似,尽管接收每个特定整型的处理器id有可能不同:
Processor #2 received 1
Processor #4 received 2
Processor #0 received 3
Processor #1 received 4
Processor #3 received 5
Processor #2 received 6
Processor #4 received 7
Processor #0 received 8
Processor #1 received 9
Processor #3 received 10
注意,取消生产者协程会关闭他的channel,从而最终终止处理器协程正在channel上执行的迭代。
另外,请注意我们在 launchProcessor
代码中如何使用for循环显式迭代channel来运行扇出。与consumeEach
不同,这种for
循环模式可以非常安全地在多协程环境中使用。如果其中一个处理器协程崩溃,其他处理器协程仍然会处理这条channel,而通过 consumeEach
编写的处理器总是消费(取消)底层channel,无论是正常还是异常地终止。
扇入(Fan-in)
多个协程也许会发送数据到同一个channel。例如,我们有一个字符串channel和一个挂起函数,这个函数以指定的延迟重复发送一个特定的字符串到这个channel:
suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) {
while (true) {
delay(time)
channel.send(s)
}
}
现在,让我们看看如果启动一组发送字符串的协程会发生什么(这个例子中,我们在主线程的上下文中,将它们作为主协程的子协程进行启动):
fun main(args: Array<String>) = runBlocking<Unit> {
val channel = Channel<String>()
launch(coroutineContext) { sendString(channel, "foo", 200L) }
launch(coroutineContext) { sendString(channel, "BAR!", 500L) }
repeat(6) { // receive first six
println(channel.receive())
}
coroutineContext.cancelChildren() // cancel all children to let main finish
}
获取完整代码 here
输出如下:
foo
foo
BAR!
foo
foo
BAR!
缓冲channel
到目前为止展示的channel都没有缓冲区。当发送方和接收方相遇时,无缓冲的channel传输数据(也称为会和)。如果先调用send,则将其挂起直到receive被调用。反之亦然。
Channel() 工厂方法和 produce 构建器都使用可选的容量参数 capacity
来指定缓冲区大小。缓冲区允许发送发在挂起之前发送多个元素,类似于具有指定容量的 BlockingQueue
,当缓冲区塞满时阻塞。
看看下面代码的行为:
fun main(args: Array<String>) = runBlocking<Unit> {
val channel = Channel<Int>(4) // create buffered channel
val sender = launch(coroutineContext) { // launch sender coroutine
repeat(10) {
println("Sending $it") // print before sending each element
channel.send(it) // will suspend when buffer is full
}
}
// don't receive anything... just wait....
delay(1000)
sender.cancel() // cancel sender coroutine
}
获取完整代码 here
它使用容量为4的缓冲channel打印了5次 "sending":
Sending 0
Sending 1
Sending 2
Sending 3
Sending 4
前4个元素被加入到了缓冲区内,然后发送者在尝试发送第5个时挂起了。
Ticker channels
Ticker channel 是一种特殊的会和channel,每次自这个channel上次的消费后、经过给定延迟时间后生产 Unit 。虽然它看起来似乎没有用,但它是一种对于创建复杂的基于时间的 produce pipelines 和 operators 来说有用的构建块,这些 produce pipelines 和 operators 可以进行窗口化和其他一些时间相关的处理。Ticker channel 可以在 select 中用来执行 "on tick" 动作。
使用工厂方法 ticker 来创建这类channel。为了表明不再需要其他元素,使用 ReceiveChannel.cancel 方法。
现在我来看一下在实践中它是如何运作的:
fun main(args: Array<String>) = runBlocking<Unit> {
val tickerChannel = ticker(delay = 100, initialDelay = 0) // create ticker channel
var nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
println("Initial element is available immediately: $nextElement") // initial delay hasn't passed yet
nextElement = withTimeoutOrNull(50) { tickerChannel.receive() } // all subsequent elements has 100ms delay
println("Next element is not ready in 50 ms: $nextElement")
nextElement = withTimeoutOrNull(60) { tickerChannel.receive() }
println("Next element is ready in 100 ms: $nextElement")
// Emulate large consumption delays
println("Consumer pauses for 150ms")
delay(150)
// Next element is available immediately
nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
println("Next element is available immediately after large consumer delay: $nextElement")
// Note that the pause between `receive` calls is taken into account and next element arrives faster
nextElement = withTimeoutOrNull(60) { tickerChannel.receive() }
println("Next element is ready in 50ms after consumer pause in 150ms: $nextElement")
tickerChannel.cancel() // indicate that no more elements are needed
}
获取完整代码 here
打印如下行:
Initial element is available immediately: kotlin.Unit
Next element is not ready in 50 ms: null
Next element is ready in 100 ms: kotlin.Unit
Consumer pauses for 150ms
Next element is available immediately after large consumer delay: kotlin.Unit
Next element is ready in 50ms after consumer pause in 150ms: kotlin.Unit
请注意,ticker 知道可能的消费者暂停,并且默认情况下,如果发生了暂停,则延迟下个生产的元素,尝试维持生产元素的固定比例。
可选地,可以指定一个值为 TickerMode.FIXED_DELAY 的 mode
参数,来维持元素间的固定延迟。
Channels是公平的
对于从多个协程发起的发送和接收调用,channel对它们的调用顺序是公平,以先进先出为序。例如第一个调用 receive 的协程获得元素。在之后的示例中,两个"ping","pong"协程从共享的"table"channel中接收"ball"对象。
data class Ball(var hits: Int)
fun main(args: Array<String>) = runBlocking<Unit> {
val table = Channel<Ball>() // a shared table
launch(coroutineContext) { player("ping", table) }
launch(coroutineContext) { player("pong", table) }
table.send(Ball(0)) // serve the ball
delay(1000) // delay 1 second
coroutineContext.cancelChildren() // game over, cancel them
}
suspend fun player(name: String, table: Channel<Ball>) {
for (ball in table) { // receive the ball in a loop
ball.hits++
println("$name $ball")
delay(300) // wait a bit
table.send(ball) // send the ball back
}
}
获取完整代码 here
"ping"协程先启动,所以它第一个接收到ball。即使"ping"协程在发送ball回table后,立即再次开始接收ball,ball还是被"pong"协程接收到了,因为它已经在等待接收了:
ping Ball(hits=1)
pong Ball(hits=2)
ping Ball(hits=3)
pong Ball(hits=4)
请注意,由于正在使用的执行者的特性,有时channel可能会产生看起来不公平的执行结果。详情参阅 this issue 。