【Kotlin】Channel简介

1 前言

Channel 是一个并发安全的阻塞队列,可以通过 send 函数往队列中塞入数据,通过 receive 函数从队列中取出数据。

当队列被塞满时,send 函数将被挂起,直到队列有空闲缓存;当队列空闲时,receive 函数将被挂起,直到队列中有新数据存入。

Channel 中队列缓存空间的大小需要在创建时指定,如果不指定,缓存空间默认是 0。

2 Channel 中 send 和 receive 案例

2.1 capacity 为 0

fun main() {
    var channel = Channel<Int>()
    CoroutineScope(Dispatchers.Default).launch { // 生产者
        repeat(3) {
            delay(10)
            println("send: $it")
            channel.send(it)
        }
    }
    CoroutineScope(Dispatchers.Default).launch { // 消费者
        repeat(3) {
            delay(100)
            var element = channel.receive()
            println("receive: $element")
        }
    }
    Thread.sleep(1000) // 阻塞当前线程, 避免程序过早结束, 协程提前取消
}

打印如下。

send: 0
receive: 0
send: 1
receive: 1
send: 2
receive: 2

说明:send 的 delay 时间比 receive 的 delay 时间短,但是并没有出现连续打印两个 send,而是打印一个 send,再打印一个 recieve,它们交替打印。因为 Channel 中队列的缓存空间默认为 0,在执行了 send 后,如果没有执行 recieve,send 将一直被挂起,直到执行了 receive 才恢复执行 send。

2.2 capacity 大于 0

fun main() {
    var channel = Channel<Int>(2)
    CoroutineScope(Dispatchers.Default).launch { // 生产者
        repeat(3) {
            delay(10)
            println("send: $it")
            channel.send(it)
        }
    }
    CoroutineScope(Dispatchers.Default).launch { // 消费者
        repeat(3) {
            delay(100)
            var element = channel.receive()
            println("receive: $element")
        }
    }
    Thread.sleep(1000) // 阻塞当前线程, 避免程序过早结束, 协程提前取消
}

打印如下。

send: 0
send: 1
send: 2
receive: 0
receive: 1
receive: 2

说明:Channel 中队列的缓存空间为 2,send 的 delay 时间比 receive 的 delay 时间短,因此会出现连续打印多个 send。

3 Channel 中迭代器

3.1 iterator

fun main() {
    var channel = Channel<Int>()
    CoroutineScope(Dispatchers.Default).launch { // 生产者
        repeat(3) {
            println("send: $it")
            channel.send(it)
        }
    }
    CoroutineScope(Dispatchers.Default).launch { // 消费者
        var iterator = channel.iterator()
        while (iterator.hasNext()) {
            var element = iterator.next()
            println("receive: $element")
        }
    }
    Thread.sleep(1000) // 阻塞当前线程, 避免程序过早结束, 协程提前取消
}

打印如下。

send: 0
send: 1
receive: 0
receive: 1
send: 2
receive: 2

3.2 for in

fun main() {
    var channel = Channel<Int>()
    CoroutineScope(Dispatchers.Default).launch { // 生产者
        repeat(3) {
            println("send: $it")
            channel.send(it)
        }
    }
    CoroutineScope(Dispatchers.Default).launch { // 消费者
        for (element in channel) {
            println("receive: $element")
        }
    }
    Thread.sleep(1000) // 阻塞当前线程, 避免程序过早结束, 协程提前取消
}

打印如下。

send: 0
receive: 0
send: 1
send: 2
receive: 1
receive: 2

4 Channel 中 produce 和 actor

produce 函数用于构造一个生产者协程,并返回一个 ReceiveChannel;actor 函数用于构造一个消费者协程,并返回一个 SendChannel。

4.1 produce

fun main() {
    var receiveChannel = CoroutineScope(Dispatchers.Default).produce<Int> { // 生产者
        repeat(3) {
            println("send: $it")
            send(it)
        }
    }
    CoroutineScope(Dispatchers.Default).launch { // 消费者
        for (element in receiveChannel) {
            println("receive: $element")
        }
    }
    Thread.sleep(1000) // 阻塞当前线程, 避免程序过早结束, 协程提前取消
}

打印如下。

send: 0
send: 1
receive: 0
receive: 1
send: 2
receive: 2

4.2 actor

fun main() {
    var sendChannel = CoroutineScope(Dispatchers.Default).actor<Int> { // 生产者
        repeat(3) {
            var element = receive()
            println("receive: $element")
        }
    }
    CoroutineScope(Dispatchers.Default).launch { // 消费者
        repeat(3) {
            println("send: $it")
            sendChannel.send(it)
        }
    }
    Thread.sleep(1000) // 阻塞当前线程, 避免程序过早结束, 协程提前取消
}

打印如下。

send: 0
send: 1
receive: 0
receive: 1
send: 2
receive: 2

5 Channel 的关闭

对于一个 Channel,如果我们调用了它的 close 函数,它会立即停止发送新元素,也就是说这时它的 isClosedForSend 会立即返回 true。而由于 Channel 缓冲区的存在,这时候可能还有一些元素没有被处理完,因此要等所有的元素都被读取之后 isClosedForReceive 才会返回 true。

fun main() {
    var channel = Channel<Int>(3)
    CoroutineScope(Dispatchers.Default).launch { // 生产者
        repeat(3) {
            println("send: $it")
            channel.send(it)
        }
        channel.close()
        println("producter, isClosedForSend=${channel.isClosedForSend}, isClosedForReceive=${channel.isClosedForReceive}")
    }
    CoroutineScope(Dispatchers.Default).launch { // 消费者
        repeat(3) {
            var element = channel.receive()
            println("receive: $element")
        }
        println("consumer, isClosedForSend=${channel.isClosedForSend}, isClosedForReceive=${channel.isClosedForReceive}")
    }
    Thread.sleep(1000) // 阻塞当前线程, 避免程序过早结束, 协程提前取消
}

打印如下。

send: 0
send: 1
send: 2
producter, isClosedForSend=true, isClosedForReceive=false
receive: 0
receive: 1
receive: 2
consumer, isClosedForSend=true, isClosedForReceive=true

6 BroadcastChannel

Channel 的生产者(producter)和消费者(consumer)都可以存在多个,但是同一个元素只会被一个消费者读到。BroadcastChannel 则不然,多个消费者不存在互斥行为。

6.1 Channel 中多个消费者

fun main() {
    var channel = Channel<Int>(2)
    CoroutineScope(Dispatchers.Default).launch { // 生产者
        delay(10)
        repeat(3) {
            println("send: $it")
            channel.send(it)
        }
    }
    repeat(2) { index ->
        CoroutineScope(Dispatchers.Default).launch { // 消费者
            for (element in channel) {
                println("receive-$index: $element")
            }
        }
    }
    Thread.sleep(1000) // 阻塞当前线程, 避免程序过早结束, 协程提前取消
}

打印如下。

send: 0
send: 1
send: 2
receive-0: 0
receive-0: 2
receive-1: 1

说明:结果表明,Channel 中同一个元素只会被一个消费者读到。

6.2 BroadcastChannel 中多个消费者

6.2.1 BroadcastChannel

fun main() {
    var broadcastChannel = BroadcastChannel<Int>(2)
    CoroutineScope(Dispatchers.Default).launch { // 生产者
        delay(10)
        repeat(3) {
            println("send: $it")
            broadcastChannel.send(it)
        }
    }
    repeat(2) { index ->
        CoroutineScope(Dispatchers.Default).launch { // 消费者
            var receiveChannel = broadcastChannel.openSubscription()
            for (element in receiveChannel) {
                println("receive-$index: $element")
            }
        }
    }
    Thread.sleep(1000) // 阻塞当前线程, 避免程序过早结束, 协程提前取消
}

打印如下。

send: 0
send: 1
send: 2
receive-0: 0
receive-0: 1
receive-0: 2
receive-1: 0
receive-1: 1
receive-1: 2

说明:结果表明,BroadcastChannel 中同一个元素可以被所有消费者读到。

6.2.2 broadcast

fun main() {
    var channel = Channel<Int>()
    var broadcastChannel = channel.broadcast(2)
    CoroutineScope(Dispatchers.Default).launch { // 生产者
        delay(10)
        repeat(3) {
            println("send: $it")
            broadcastChannel.send(it)
        }
    }
    repeat(2) { index ->
        CoroutineScope(Dispatchers.Default).launch { // 消费者
            var receiveChannel = broadcastChannel.openSubscription()
            for (element in receiveChannel) {
                println("receive-$index: $element")
            }
        }
    }
    Thread.sleep(1000) // 阻塞当前线程, 避免程序过早结束, 协程提前取消
}

打印如下。

send: 0
send: 1
send: 2
receive-1: 0
receive-1: 1
receive-1: 2
receive-0: 0
receive-0: 1
receive-0: 2

声明:本文转自【Kotlin】Channel简介

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 219,490评论 6 508
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,581评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 165,830评论 0 356
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,957评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,974评论 6 393
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,754评论 1 307
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,464评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,357评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,847评论 1 317
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,995评论 3 338
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,137评论 1 351
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,819评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,482评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,023评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,149评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,409评论 3 373
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,086评论 2 355

推荐阅读更多精彩内容