【翻译】kotlin协程核心库文档(五)—— 通道

github原文地址

原创翻译,转载请保留或注明出处:https://www.jianshu.com/p/afd359a976e2

通道(Channels)


延迟值提供了一个在协程间传输单个值的便捷方式。Channel 提供了一种传输数据流的方式。

channel基础

Channel 是与 BlockingQueue很相似的概念。一个关键的区别是它具有一个挂起 send 代替阻塞的put 操作,同时具有一个挂起 receive 代替阻塞的take 操作。

fun main(args: Array<String>) = runBlocking<Unit> {
    val channel = Channel<Int>()
    launch {
        // this might be heavy CPU-consuming computation or async logic, we'll just send five squares
        for (x in 1..5) channel.send(x * x)
    }
    // here we print five received integers:
    repeat(5) { println(channel.receive()) }
    println("Done!")
}

获取完整代码 here

输出结果:

1
4
9
16
25
Done!

关闭和channel迭代

与队列不同,可以通过关闭channel来指示出不再有元素到来。在接收端,通过常规的 for 循环来接收channel里的元素是很方便的。

从概念上来说,一个 close 调用就像往channel里发送一个特殊的关闭token。一旦受到此关闭token,迭代就会停止,这样可以保证收到关闭命令前,先前所有发送的元素都可以被接收到。

fun main(args: Array<String>) = runBlocking<Unit> {
    val channel = Channel<Int>()
    launch {
        for (x in 1..5) channel.send(x * x)
        channel.close() // we're done sending
    }
    // here we print received values using `for` loop (until the channel is closed)
    for (y in channel) println(y)
    println("Done!")
}

获取完整代码 here

构建channel生产者

协程生成一系列元素的模式很常见。这是“生产者-消费者模式”的一部分,这种模式经常出现在并发代码中。你可以将这样一个生产者抽象为一个用channel作为参数的函数,但这与常规意义上的“结果必须从函数返回”相违背。

在生产者端有一种便利的方式来做这件事:使用名为 produce 的协程构建器。而且消费者端可以用扩展函数 consumeEach 代替for循环。

fun produceSquares() = produce<Int> {
    for (x in 1..5) send(x * x)
}

fun main(args: Array<String>) = runBlocking<Unit> {
    val squares = produceSquares()
    squares.consumeEach { println(it) }
    println("Done!")
}

获取完整代码 here

流水线(Pipelines)

流水线(pipeline)是一种协程生成可能是无限的数据流的模式:

fun produceNumbers() = produce<Int> {
    var x = 1
    while (true) send(x++) // infinite stream of integers starting from 1
}

与此同时其他的协程消费这个流,做一些处理,并产生一些其他结果。在以下示例中数值只是被平方:

fun square(numbers: ReceiveChannel<Int>) = produce<Int> {
    for (x in numbers) send(x * x)
}

主函数启动并连接整个pipeline:

fun main(args: Array<String>) = runBlocking<Unit> {
    val numbers = produceNumbers() // produces integers from 1 and on
    val squares = square(numbers) // squares integers
    for (i in 1..5) println(squares.receive()) // print first five
    println("Done!") // we are done
    squares.cancel() // need to cancel these coroutines in a larger app
    numbers.cancel()
}

获取完整代码 here

我们不必在此示例应用中取消这些协程,因为 coroutines are like daemon threads 协程就像守护线程,但是在一个更大型的应用中,如果我们不再需要它,那就需要停止我们的pipeline。又或者,我们可以将pipeline协程作为 children of a main coroutine 主协程的子协程运行,如下所示:

使用pipeline生成素数

让我们通过一个使用协程pipeline生成素数的例子将pipeline带到极端。我们从一个无限的数值序列开始,这次我们引入一个显示的 context 参数并将其传给 produce 构建器,所以调用者可以控制我们协程的运行位置:

fun numbersFrom(context: CoroutineContext, start: Int) = produce<Int>(context) {
    var x = start
    while (true) send(x++) // infinite stream of integers from start
}

以下的pipeline stage过滤传入的数值流,删除所有可被给定素数整除的数值:

fun filter(context: CoroutineContext, numbers: ReceiveChannel<Int>, prime: Int) = produce<Int>(context) {
    for (x in numbers) if (x % prime != 0) send(x)
}

现在我们开始构建一个从2起的数值流pipeline,从当前channel获取素数,然后为每个找到的素数启动新的pipeline stage:

numbersFrom(2) -> filter(2) -> filter(3) -> filter(5) -> filter(7) ...

以下示例打印前十个素数,在主线程的上下文中运行整个pipeline。由于所有协程都是作为主 runBlocking 协程的子协程在其 coroutineContext 上下文中启动的,因为我们不必保留一份明确的清单记录我们所启动的所有协程。我们使用 cancelChildren 扩展函数来取消所有子协程。

fun main(args: Array<String>) = runBlocking<Unit> {
    var cur = numbersFrom(coroutineContext, 2)
    for (i in 1..10) {
        val prime = cur.receive()
        println(prime)
        cur = filter(coroutineContext, cur, prime)
    }
    coroutineContext.cancelChildren() // cancel all children to let main finish
}

获取完整代码 here

这段代码输出如下:

2
3
5
7
11
13
17
19
23
29

注意一点,你可以使用标注库中的 buildIterator 协程构建器来构建相同的pipeline。用 buildIterator 替代 produceyield 替代 sendnext 替代receiveIterator 替代 ReceiveChannel ,并去除上下文。同时也不需要使用 runBlocking 。然而如上所示,pipeline使用channel的好处是,如果在 CommonPool 上下文中运行它,实际上可以使用多个cpu核心。

无论如何,这是一种极不切实际的寻找素数的方法。实际上,pipeline确实涉及一些其他的挂起调用(比如对远程服务的异步调用),并且这些pipeline不能使用 buildSeqeunce/buildIterator 构建,因为它们不允许任意挂起。这与完全的异步的 produce 有所不同。

扇出(Fan-out)

多个协程可以从同一个channel接收数据,在它们之间分配工作。让我们从一个定时生产整型(每秒10个数)的生产者协程开始:

fun produceNumbers() = produce<Int> {
    var x = 1 // start from 1
    while (true) {
        send(x++) // produce next
        delay(100) // wait 0.1s
    }
}

接着我们可以有几个处理协程。在这个示例中,它们只是打印它们的id和接收数值:

fun launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch {
    for (msg in channel) {
        println("Processor #$id received $msg")
    }
}

现在我们启动5个处理器,让它们工作近乎1秒钟,看看会发生什么:

fun main(args: Array<String>) = runBlocking<Unit> {
    val producer = produceNumbers()
    repeat(5) { launchProcessor(it, producer) }
    delay(950)
    producer.cancel() // cancel producer coroutine and thus kill them all
}

获取完整代码 here

输出会和下面这个很类似,尽管接收每个特定整型的处理器id有可能不同:

Processor #2 received 1
Processor #4 received 2
Processor #0 received 3
Processor #1 received 4
Processor #3 received 5
Processor #2 received 6
Processor #4 received 7
Processor #0 received 8
Processor #1 received 9
Processor #3 received 10

注意,取消生产者协程会关闭他的channel,从而最终终止处理器协程正在channel上执行的迭代。

另外,请注意我们在 launchProcessor 代码中如何使用for循环显式迭代channel来运行扇出。与consumeEach 不同,这种for循环模式可以非常安全地在多协程环境中使用。如果其中一个处理器协程崩溃,其他处理器协程仍然会处理这条channel,而通过 consumeEach 编写的处理器总是消费(取消)底层channel,无论是正常还是异常地终止。

扇入(Fan-in)

多个协程也许会发送数据到同一个channel。例如,我们有一个字符串channel和一个挂起函数,这个函数以指定的延迟重复发送一个特定的字符串到这个channel:

suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) {
    while (true) {
        delay(time)
        channel.send(s)
    }
}

现在,让我们看看如果启动一组发送字符串的协程会发生什么(这个例子中,我们在主线程的上下文中,将它们作为主协程的子协程进行启动):

fun main(args: Array<String>) = runBlocking<Unit> {
    val channel = Channel<String>()
    launch(coroutineContext) { sendString(channel, "foo", 200L) }
    launch(coroutineContext) { sendString(channel, "BAR!", 500L) }
    repeat(6) { // receive first six
        println(channel.receive())
    }
    coroutineContext.cancelChildren() // cancel all children to let main finish
}

获取完整代码 here

输出如下:

foo
foo
BAR!
foo
foo
BAR!

缓冲channel

到目前为止展示的channel都没有缓冲区。当发送方和接收方相遇时,无缓冲的channel传输数据(也称为会和)。如果先调用send,则将其挂起直到receive被调用。反之亦然。

Channel() 工厂方法和 produce 构建器都使用可选的容量参数 capacity 来指定缓冲区大小。缓冲区允许发送发在挂起之前发送多个元素,类似于具有指定容量的 BlockingQueue ,当缓冲区塞满时阻塞。

看看下面代码的行为:

fun main(args: Array<String>) = runBlocking<Unit> {
    val channel = Channel<Int>(4) // create buffered channel
    val sender = launch(coroutineContext) { // launch sender coroutine
        repeat(10) {
            println("Sending $it") // print before sending each element
            channel.send(it) // will suspend when buffer is full
        }
    }
    // don't receive anything... just wait....
    delay(1000)
    sender.cancel() // cancel sender coroutine
}

获取完整代码 here

它使用容量为4的缓冲channel打印了5次 "sending":

Sending 0
Sending 1
Sending 2
Sending 3
Sending 4

前4个元素被加入到了缓冲区内,然后发送者在尝试发送第5个时挂起了。

Ticker channels

Ticker channel 是一种特殊的会和channel,每次自这个channel上次的消费后、经过给定延迟时间后生产 Unit 。虽然它看起来似乎没有用,但它是一种对于创建复杂的基于时间的 produce pipelines 和 operators 来说有用的构建块,这些 produce pipelines 和 operators 可以进行窗口化和其他一些时间相关的处理。Ticker channel 可以在 select 中用来执行 "on tick" 动作。

使用工厂方法 ticker 来创建这类channel。为了表明不再需要其他元素,使用 ReceiveChannel.cancel 方法。

现在我来看一下在实践中它是如何运作的:

fun main(args: Array<String>) = runBlocking<Unit> {
    val tickerChannel = ticker(delay = 100, initialDelay = 0) // create ticker channel
    var nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
    println("Initial element is available immediately: $nextElement") // initial delay hasn't passed yet

    nextElement = withTimeoutOrNull(50) { tickerChannel.receive() } // all subsequent elements has 100ms delay
    println("Next element is not ready in 50 ms: $nextElement")

    nextElement = withTimeoutOrNull(60) { tickerChannel.receive() }
    println("Next element is ready in 100 ms: $nextElement")

    // Emulate large consumption delays
    println("Consumer pauses for 150ms")
    delay(150)
    // Next element is available immediately
    nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
    println("Next element is available immediately after large consumer delay: $nextElement")
    // Note that the pause between `receive` calls is taken into account and next element arrives faster
    nextElement = withTimeoutOrNull(60) { tickerChannel.receive() }
    println("Next element is ready in 50ms after consumer pause in 150ms: $nextElement")

    tickerChannel.cancel() // indicate that no more elements are needed
}

获取完整代码 here

打印如下行:

Initial element is available immediately: kotlin.Unit
Next element is not ready in 50 ms: null
Next element is ready in 100 ms: kotlin.Unit
Consumer pauses for 150ms
Next element is available immediately after large consumer delay: kotlin.Unit
Next element is ready in 50ms after consumer pause in 150ms: kotlin.Unit

请注意,ticker 知道可能的消费者暂停,并且默认情况下,如果发生了暂停,则延迟下个生产的元素,尝试维持生产元素的固定比例。

可选地,可以指定一个值为 TickerMode.FIXED_DELAYmode 参数,来维持元素间的固定延迟。

Channels是公平的

对于从多个协程发起的发送和接收调用,channel对它们的调用顺序是公平,以先进先出为序。例如第一个调用 receive 的协程获得元素。在之后的示例中,两个"ping","pong"协程从共享的"table"channel中接收"ball"对象。

data class Ball(var hits: Int)

fun main(args: Array<String>) = runBlocking<Unit> {
    val table = Channel<Ball>() // a shared table
    launch(coroutineContext) { player("ping", table) }
    launch(coroutineContext) { player("pong", table) }
    table.send(Ball(0)) // serve the ball
    delay(1000) // delay 1 second
    coroutineContext.cancelChildren() // game over, cancel them
}

suspend fun player(name: String, table: Channel<Ball>) {
    for (ball in table) { // receive the ball in a loop
        ball.hits++
        println("$name $ball")
        delay(300) // wait a bit
        table.send(ball) // send the ball back
    }
}

获取完整代码 here

"ping"协程先启动,所以它第一个接收到ball。即使"ping"协程在发送ball回table后,立即再次开始接收ball,ball还是被"pong"协程接收到了,因为它已经在等待接收了:

ping Ball(hits=1)
pong Ball(hits=2)
ping Ball(hits=3)
pong Ball(hits=4)

请注意,由于正在使用的执行者的特性,有时channel可能会产生看起来不公平的执行结果。详情参阅 this issue

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,992评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,212评论 3 388
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,535评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,197评论 1 287
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,310评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,383评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,409评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,191评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,621评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,910评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,084评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,763评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,403评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,083评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,318评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,946评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,967评论 2 351