22.Go channel和select

channel是指定类型的值的线程安全队列, channel的最大用途是goroutines之间进行通信。

goroutines通信时使用ch<-value将值写入channel,使用value<-ch从channel中接收值.

channel基本使用方法:

func genInts(chInts chan int) {
    chInts <- rand.Intn(1000)
}

func main() {
    chInts := make(chan int)
    for i := 0; i < 2; i++ {
        go genInts(chInts)
    }
    n := <-chInts
    fmt.Printf("n: %d\n", n)

    select {
    case n := <-chInts:
        fmt.Printf("n: %d\n", n)
    }
}

n: 81
n: 887

channel的零值是nil. 写入nil channel会阻塞,所以在使用channel前的第一件事是使用make(chan <type>, <queue size>)初始化它.Queue size是可选的,默认是0,代表没有缓冲的channel.

使用chan<-value发送数据会将值添加到队列的最后.

如果channel满了, <-操作会阻塞

发送数据到nil channel会永久阻塞.

使用value=<-chan读取数据是从队列的头开始.

如果channel是空的,读取操作会阻塞.

从channel读取数据的另一种方法是使用select语句。

使用select语句可以:

  • 等待多个channel
  • 完成非阻塞等待
  • 实现有延时的等待(timer channel)

channel是有固定的大小.

初始化channel时如果不指定大小(make(chan int)),则它的大小为0,称为无缓冲channel. 发送到无缓冲channel会阻塞,直到有相应的接收完成.

初始化channel时如果指定大小(make(chan int, 3))称为缓冲channel,缓冲大小为3.

前2个发送回立即结束, 第4个发送会阻塞,直到有值从channel中取出.

使用close(chan)关闭channel.

关闭两次会引发panic.

发送数据到关闭的channel会引发panic.

从关闭的channel中读取值会:

  • 返回缓冲的值
  • 如果没有更多的缓冲值,则立即返回零值

使用range从channel中读取数据

当从channel中读取多个值时,通常会使用range:

func foo(ch chan int) {
    ch <- 1
    ch <- 2
    close(ch)
}

func main() {
    ch := make(chan int)
    go foo(ch)
    for n := range ch {
        fmt.Println(n)
    }
    fmt.Println("channel is now closed")
}

1
2
channel is now closed

channel关闭,循环就会结束

使用工作池时,这是常见的模式:

  • 为所有工作创建一个channel
  • 启动工作
  • 工作使用v:=range chan来提取要处理的任务
  • 在对所有作业进行排队之后,关闭channel,以便goroutine处理channel中的所有作业

使用select从channel中超时读取

从channel中读取数据时,有时候希望限制等待的时间

使用select可以达到目的:

func main() {
    chResult := make(chan int, 1)

    go func() {
        time.Sleep(1 * time.Second)
        chResult <- 5
        fmt.Printf("Worker finished")
    }()

    select {
    case res := <-chResult:
        fmt.Printf("Got %d from worker\n", res)
    case <-time.After(100 * time.Millisecond):
        fmt.Printf("Timed out before worker finished\n")
    }
}

Timed out before worker finished

让我们看看这是如何工作的:

select {
    case res := <-chResult:
        fmt.Printf("Got %d from worker\n", res)
    case <-time.After(100 * time.Millisecond):
        fmt.Printf("Timed out before worker finished\n")
}

time.After returns a channel on which a value will be enqueued after a given time (100 milliseconds in our example). It's worth nothing that it's at least 100 ms and can be more. Let's call it a timeout channel.
we don't care about the value read from timeout channel. We only care that value was sent on the channel
we use select to wait on 2 channels: chResult and a timeout channel
select finishes when receive on one of the 2 channels completes
we either get the value on chResult before timeout expires or we receive the value from timeout channel

关闭channel

使用close(chan)关闭channel.

关闭channel的主要目的是通知worker goroutine他们的工作已经完成并且可以结束。保证了goroutines不会泄露

ch := make(chan string)

go func() {
    for s := range ch {
        fmt.Printf("received from channel: %s\n", s)
    }
    fmt.Print("range loop finished because ch was closed\n")
}()

ch <- "foo"
close(ch)

received from channel: foo

从已关闭的channel中读取数据会立即返回零值.

ch := make(chan string)
close(ch)
v := <-ch
fmt.Printf("Receive from closed channel immediately returns zero value of the type: %#v\n", v)

Receive from closed channel immediately returns zero value of the type: ""

判断channel是否关闭:

ch := make(chan int)
go func() {
    ch <- 1
    close(ch)
}()
v, isOpen := <-ch
fmt.Printf("received %d, is channel open: %v\n", v, isOpen)
v, isClosed = <-ch
fmt.Printf("received %d, is channel open: %v\n", v, isOpen)

received 1, is channel open: true
received 0, is channel open: false

重复关闭channel会引发panic:

ch := make(chan string)
close(ch)
close(ch)

panic: close of closed channel

goroutine 1 [running]:
main.main()
/tmp/src438901704/main.go:9 +0x57
exit status 2

发送数据到关闭的channel引发panic:

ch := make(chan int)
close(ch)
ch <- 5 // panics

panic: send on closed channel

goroutine 1 [running]:
main.main()
/tmp/src194641031/main.go:9 +0x63
exit status 2

是否缓冲

发送和接收goroutines块,除非发送goroutine具有要发送的值,并且接收goroutine已准备好接收。

对每个接收/发送操作坚持同步可能会导致不必要的速度降低。

想象一个场景,一个工人生产,而另一个工人消费。

如果产生一个值要花一秒钟,消耗也要花一秒钟,则要花2秒的时间来产生和消耗一个值。

如果生产者可以在channel中排队,则不必等待消费者为每个值做好准备。

这是缓冲channel的好处。

通过允许生产者独立于消费者进行生产,我们可以加快某些场景:

func producer(ch chan int) {
    for i := 0; i < 5; i++ {
        if i%2 == 0 {
            time.Sleep(10 * time.Millisecond)
        } else {
            time.Sleep(1 * time.Millisecond)
        }
        ch <- i
    }
}

func consumer(ch chan int) {
    total := 0
    for i := 0; i < 5; i++ {
        if i%2 == 1 {
            time.Sleep(10 * time.Millisecond)
        } else {
            time.Sleep(1 * time.Millisecond)
        }
        total += <-ch
    }
}

func unbuffered() {
    timeStart := time.Now()
    ch := make(chan int)
    go producer(ch)
    consumer(ch)
    fmt.Printf("Unbuffered version took %s\n", time.Since(timeStart))
}

func buffered() {
    timeStart := time.Now()
    ch := make(chan int, 5)
    go producer(ch)
    consumer(ch)
    fmt.Printf("Buffered version took %s\n", time.Since(timeStart))
}

func main() {
    unbuffered()
    buffered()
}

Unbuffered version took 50.854313ms
Buffered version took 32.800727ms

使用select非阻塞接收

您可以使用select语句的默认部分进行非阻塞等待。

func main() {
    ch := make(chan int, 1)

end:
    for {
        select {
        case n := <-ch:
            fmt.Printf("Received %d from a channel\n", n)
            break end
        default:
            fmt.Print("Channel is empty\n")
            ch <- 8
        }
        // wait for channel to be filled with values
        // don't use time.Sleep() like that in production code
        time.Sleep(20 * time.Millisecond)
    }
}

Channel is empty
Received 8 from a channel

在for循环的第一次迭代中,由于channel为空,因此select立即以default子句结束。

我们将值发送到该通道,以便下一个选择将从通道中获取该值。

信令信道 chan struct{}

有时不想通过channel发送值,而仅将其用作信号事件的一种方式。

信令通道通常用来通知goroutine结束:

func worker(ch chan int, chQuit chan struct{}) {
    for {
        select {
        case v := <-ch:
            fmt.Printf("Got value %d\n", v)
        case <-chQuit:
            fmt.Printf("Signalled on quit channel. Finishing\n")
            chQuit <- struct{}{}
            return
        }
    }
}
func main() {
    ch, chQuit := make(chan int), make(chan struct{})
    go worker(ch, chQuit)
    ch <- 3
    chQuit <- struct{}{}

    // wait to be signalled back by the worker
    <-chQuit
}

Got value 3
Signalled on quit channel. Finishing

检查通道是否有可用数据

如果通道中没有数据,则在通道上接收会阻塞。

如果您不想阻止怎么办?

您可能很想在接收之前检查通道是否有数据。

您无法在Go中执行此操作,因为它可能无法正常运行。 在您检查可用性的时间和您收到数据的时间之间,其他一些goroutine可能会获取该值。

如果要避免无限等待,可以使用select添加超时或进行非阻塞等待。

注意

发送数据到nil channel将永久阻塞

package main

func main() {
    var ch chan bool
    ch <- true // deadlocks because ch is nil
}

通道的未初始化值是nil,因此上述程序会永远阻塞。

从nil channel接收数据将永久阻塞

package main

import "fmt"

func main() {
        var ch chan bool
        fmt.Printf("Value received from ch is: %v\n", <-ch) // deadlock because c is nil
}

发送数据到关闭的channel引发panic

package main

import (
    "fmt"
    "time"
)

func main() {
    var ch = make(chan int, 100)
    go func() {
        ch <- 1
        time.Sleep(time.Second)
        close(ch)
        ch <- 1
    }()
    for i := range ch {
        fmt.Printf("i: %d\n", i)
    }
}

i: 1
panic: send on closed channel

goroutine 5 [running]:
main.main.func1(0x452000, 0xc99)
/tmp/sandbox307976305/main.go:14 +0xa0
created by main.main
/tmp/sandbox307976305/main.go:10 +0x60

您应该对程序进行架构设计,以使一个发送方控制频道的生存期。

该规则强调:如果只有一个频道发送者,那么确保您永远不会写入封闭的频道没有问题。

如果您有多个发件人,这将变得很困难:如果一个发件人关闭了一个频道,那么其他发件人应该不会崩溃吗?

无需尝试解决上述问题的方法,而是重新设计代码,以使只有一个发送方可以控制通道的生存期。

从已关闭的channel接收数据会立即返回零值

package main

import "fmt"

func main() {
    // show
    ch := make(chan int, 2)
    ch <- 1
    ch <- 2
    close(ch)
    for i := 0; i < 3; i++ {
        fmt.Printf("%d ", <-ch) // -> 1 2 0
    }
    // show end
}

很容易补救:

package main

import "fmt"

func main() {
    ch := make(chan int, 2)
    ch <- 1
    ch <- 2
    close(ch)
    // show
    for {
        v, ok := <-ch
        if !ok {
            break
        }
        fmt.Printf("%d ", v) // -> 1 2
    }
    // show end
}

更好更惯用的方法:

package main

import "fmt"

func main() {
    ch := make(chan int, 2)
    ch <- 1
    ch <- 2
    close(ch)
    // show
    for v := range ch {
        fmt.Printf("%d ", v) // -> 1 2
    }
    // show end
}

关闭channel以表明Goroutine已结束

有时我们需要等到goroutine完成。

来自已关闭channel的接收会立即返回,可以通过共享done channel来在goroutine之间进行协调。

一个goroutine在最后关闭通道。另一个goroutine可以无限期地等待,直到接收到chDone <-

在更复杂的情况下,它可以使用select语句从多个渠道接收。 例如,要限制等待时间:

select {
    case <- chDone:
        // goroutine has finished
    case <- time.After(time.Second *5):
        // goroutine didn't finish but we don't want to wait
        // more than 5 seconds
}

要检查channel是否已关闭(即goroutine已完成)而不等待:

package main

import "fmt"

// show
func checkState(ch chan struct{}) {
    select {
    case <-ch:
        fmt.Printf("channel is closed\n")
    default:
        fmt.Printf("channel is not closed\n")
    }
}
// show end

func main() {
    // show
    ch := make(chan struct{})
    checkState(ch)
    close(ch)
    checkState(ch)
    // show end
}

此技术用于context.Done() channel

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,099评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,828评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,540评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,848评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,971评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,132评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,193评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,934评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,376评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,687评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,846评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,537评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,175评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,887评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,134评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,674评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,741评论 2 351

推荐阅读更多精彩内容

  • Go语言并发模型 Go 语言中使用了CSP模型来进行线程通信,准确说,是轻量级线程goroutine之间的通信。C...
    副班长国伟阅读 2,076评论 0 2
  • Chapter 8 Goroutines and Channels Go enable two styles of...
    SongLiang阅读 1,578评论 0 3
  • 单纯地将函数并发执行是没有意义的,函数与函数之间需要交换数据才能体现并发执行函数的作用。虽然可使用共享内存进行数据...
    JunChow520阅读 421评论 0 2
  • 本篇文章内容基于go1.14.2分析 golang的chan是一个内置类型,作为csp编程的核心数据结构,其底层数...
    litesky阅读 276评论 0 0
  • Channel是Go中的一个核心类型,你可以把它看成一个管道,通过它并发核心单元就可以发送或者接收数据进行通讯(c...
    空即是色即是色即是空阅读 2,338评论 0 1