channel是指定类型的值的线程安全队列, channel的最大用途是goroutines之间进行通信。
goroutines通信时使用ch<-value将值写入channel,使用value<-ch从channel中接收值.
channel基本使用方法:
func genInts(chInts chan int) {
chInts <- rand.Intn(1000)
}
func main() {
chInts := make(chan int)
for i := 0; i < 2; i++ {
go genInts(chInts)
}
n := <-chInts
fmt.Printf("n: %d\n", n)
select {
case n := <-chInts:
fmt.Printf("n: %d\n", n)
}
}
n: 81
n: 887
channel的零值是nil. 写入nil channel会阻塞,所以在使用channel前的第一件事是使用make(chan <type>, <queue size>)
初始化它.Queue size是可选的,默认是0,代表没有缓冲的channel.
使用chan<-value发送数据会将值添加到队列的最后.
如果channel满了, <-操作会阻塞
发送数据到nil channel会永久阻塞.
使用value=<-chan读取数据是从队列的头开始.
如果channel是空的,读取操作会阻塞.
从channel读取数据的另一种方法是使用select语句。
使用select语句可以:
- 等待多个channel
- 完成非阻塞等待
- 实现有延时的等待(timer channel)
channel是有固定的大小.
初始化channel时如果不指定大小(make(chan int)),则它的大小为0,称为无缓冲channel. 发送到无缓冲channel会阻塞,直到有相应的接收完成.
初始化channel时如果指定大小(make(chan int, 3))称为缓冲channel,缓冲大小为3.
前2个发送回立即结束, 第4个发送会阻塞,直到有值从channel中取出.
使用close(chan)关闭channel.
关闭两次会引发panic.
发送数据到关闭的channel会引发panic.
从关闭的channel中读取值会:
- 返回缓冲的值
- 如果没有更多的缓冲值,则立即返回零值
使用range从channel中读取数据
当从channel中读取多个值时,通常会使用range:
func foo(ch chan int) {
ch <- 1
ch <- 2
close(ch)
}
func main() {
ch := make(chan int)
go foo(ch)
for n := range ch {
fmt.Println(n)
}
fmt.Println("channel is now closed")
}
1
2
channel is now closed
channel关闭,循环就会结束
使用工作池时,这是常见的模式:
- 为所有工作创建一个channel
- 启动工作
- 工作使用
v:=range chan
来提取要处理的任务 - 在对所有作业进行排队之后,关闭channel,以便goroutine处理channel中的所有作业
使用select从channel中超时读取
从channel中读取数据时,有时候希望限制等待的时间
使用select可以达到目的:
func main() {
chResult := make(chan int, 1)
go func() {
time.Sleep(1 * time.Second)
chResult <- 5
fmt.Printf("Worker finished")
}()
select {
case res := <-chResult:
fmt.Printf("Got %d from worker\n", res)
case <-time.After(100 * time.Millisecond):
fmt.Printf("Timed out before worker finished\n")
}
}
Timed out before worker finished
让我们看看这是如何工作的:
select {
case res := <-chResult:
fmt.Printf("Got %d from worker\n", res)
case <-time.After(100 * time.Millisecond):
fmt.Printf("Timed out before worker finished\n")
}
time.After returns a channel on which a value will be enqueued after a given time (100 milliseconds in our example). It's worth nothing that it's at least 100 ms and can be more. Let's call it a timeout channel.
we don't care about the value read from timeout channel. We only care that value was sent on the channel
we use select to wait on 2 channels: chResult and a timeout channel
select finishes when receive on one of the 2 channels completes
we either get the value on chResult before timeout expires or we receive the value from timeout channel
关闭channel
使用close(chan)关闭channel.
关闭channel的主要目的是通知worker goroutine他们的工作已经完成并且可以结束。保证了goroutines不会泄露
ch := make(chan string)
go func() {
for s := range ch {
fmt.Printf("received from channel: %s\n", s)
}
fmt.Print("range loop finished because ch was closed\n")
}()
ch <- "foo"
close(ch)
received from channel: foo
从已关闭的channel中读取数据会立即返回零值.
ch := make(chan string)
close(ch)
v := <-ch
fmt.Printf("Receive from closed channel immediately returns zero value of the type: %#v\n", v)
Receive from closed channel immediately returns zero value of the type: ""
判断channel是否关闭:
ch := make(chan int)
go func() {
ch <- 1
close(ch)
}()
v, isOpen := <-ch
fmt.Printf("received %d, is channel open: %v\n", v, isOpen)
v, isClosed = <-ch
fmt.Printf("received %d, is channel open: %v\n", v, isOpen)
received 1, is channel open: true
received 0, is channel open: false
重复关闭channel会引发panic:
ch := make(chan string)
close(ch)
close(ch)
panic: close of closed channel
goroutine 1 [running]:
main.main()
/tmp/src438901704/main.go:9 +0x57
exit status 2
发送数据到关闭的channel引发panic:
ch := make(chan int)
close(ch)
ch <- 5 // panics
panic: send on closed channel
goroutine 1 [running]:
main.main()
/tmp/src194641031/main.go:9 +0x63
exit status 2
是否缓冲
发送和接收goroutines块,除非发送goroutine具有要发送的值,并且接收goroutine已准备好接收。
对每个接收/发送操作坚持同步可能会导致不必要的速度降低。
想象一个场景,一个工人生产,而另一个工人消费。
如果产生一个值要花一秒钟,消耗也要花一秒钟,则要花2秒的时间来产生和消耗一个值。
如果生产者可以在channel中排队,则不必等待消费者为每个值做好准备。
这是缓冲channel的好处。
通过允许生产者独立于消费者进行生产,我们可以加快某些场景:
func producer(ch chan int) {
for i := 0; i < 5; i++ {
if i%2 == 0 {
time.Sleep(10 * time.Millisecond)
} else {
time.Sleep(1 * time.Millisecond)
}
ch <- i
}
}
func consumer(ch chan int) {
total := 0
for i := 0; i < 5; i++ {
if i%2 == 1 {
time.Sleep(10 * time.Millisecond)
} else {
time.Sleep(1 * time.Millisecond)
}
total += <-ch
}
}
func unbuffered() {
timeStart := time.Now()
ch := make(chan int)
go producer(ch)
consumer(ch)
fmt.Printf("Unbuffered version took %s\n", time.Since(timeStart))
}
func buffered() {
timeStart := time.Now()
ch := make(chan int, 5)
go producer(ch)
consumer(ch)
fmt.Printf("Buffered version took %s\n", time.Since(timeStart))
}
func main() {
unbuffered()
buffered()
}
Unbuffered version took 50.854313ms
Buffered version took 32.800727ms
使用select非阻塞接收
您可以使用select语句的默认部分进行非阻塞等待。
func main() {
ch := make(chan int, 1)
end:
for {
select {
case n := <-ch:
fmt.Printf("Received %d from a channel\n", n)
break end
default:
fmt.Print("Channel is empty\n")
ch <- 8
}
// wait for channel to be filled with values
// don't use time.Sleep() like that in production code
time.Sleep(20 * time.Millisecond)
}
}
Channel is empty
Received 8 from a channel
在for循环的第一次迭代中,由于channel为空,因此select立即以default子句结束。
我们将值发送到该通道,以便下一个选择将从通道中获取该值。
信令信道 chan struct{}
有时不想通过channel发送值,而仅将其用作信号事件的一种方式。
信令通道通常用来通知goroutine结束:
func worker(ch chan int, chQuit chan struct{}) {
for {
select {
case v := <-ch:
fmt.Printf("Got value %d\n", v)
case <-chQuit:
fmt.Printf("Signalled on quit channel. Finishing\n")
chQuit <- struct{}{}
return
}
}
}
func main() {
ch, chQuit := make(chan int), make(chan struct{})
go worker(ch, chQuit)
ch <- 3
chQuit <- struct{}{}
// wait to be signalled back by the worker
<-chQuit
}
Got value 3
Signalled on quit channel. Finishing
检查通道是否有可用数据
如果通道中没有数据,则在通道上接收会阻塞。
如果您不想阻止怎么办?
您可能很想在接收之前检查通道是否有数据。
您无法在Go中执行此操作,因为它可能无法正常运行。 在您检查可用性的时间和您收到数据的时间之间,其他一些goroutine可能会获取该值。
如果要避免无限等待,可以使用select添加超时或进行非阻塞等待。
注意
发送数据到nil channel将永久阻塞
package main
func main() {
var ch chan bool
ch <- true // deadlocks because ch is nil
}
通道的未初始化值是nil,因此上述程序会永远阻塞。
从nil channel接收数据将永久阻塞
package main
import "fmt"
func main() {
var ch chan bool
fmt.Printf("Value received from ch is: %v\n", <-ch) // deadlock because c is nil
}
发送数据到关闭的channel引发panic
package main
import (
"fmt"
"time"
)
func main() {
var ch = make(chan int, 100)
go func() {
ch <- 1
time.Sleep(time.Second)
close(ch)
ch <- 1
}()
for i := range ch {
fmt.Printf("i: %d\n", i)
}
}
i: 1
panic: send on closed channelgoroutine 5 [running]:
main.main.func1(0x452000, 0xc99)
/tmp/sandbox307976305/main.go:14 +0xa0
created by main.main
/tmp/sandbox307976305/main.go:10 +0x60
您应该对程序进行架构设计,以使一个发送方控制频道的生存期。
该规则强调:如果只有一个频道发送者,那么确保您永远不会写入封闭的频道没有问题。
如果您有多个发件人,这将变得很困难:如果一个发件人关闭了一个频道,那么其他发件人应该不会崩溃吗?
无需尝试解决上述问题的方法,而是重新设计代码,以使只有一个发送方可以控制通道的生存期。
从已关闭的channel接收数据会立即返回零值
package main
import "fmt"
func main() {
// show
ch := make(chan int, 2)
ch <- 1
ch <- 2
close(ch)
for i := 0; i < 3; i++ {
fmt.Printf("%d ", <-ch) // -> 1 2 0
}
// show end
}
很容易补救:
package main
import "fmt"
func main() {
ch := make(chan int, 2)
ch <- 1
ch <- 2
close(ch)
// show
for {
v, ok := <-ch
if !ok {
break
}
fmt.Printf("%d ", v) // -> 1 2
}
// show end
}
更好更惯用的方法:
package main
import "fmt"
func main() {
ch := make(chan int, 2)
ch <- 1
ch <- 2
close(ch)
// show
for v := range ch {
fmt.Printf("%d ", v) // -> 1 2
}
// show end
}
关闭channel以表明Goroutine已结束
有时我们需要等到goroutine完成。
来自已关闭channel的接收会立即返回,可以通过共享done channel来在goroutine之间进行协调。
一个goroutine在最后关闭通道。另一个goroutine可以无限期地等待,直到接收到chDone <-
在更复杂的情况下,它可以使用select语句从多个渠道接收。 例如,要限制等待时间:
select {
case <- chDone:
// goroutine has finished
case <- time.After(time.Second *5):
// goroutine didn't finish but we don't want to wait
// more than 5 seconds
}
要检查channel是否已关闭(即goroutine已完成)而不等待:
package main
import "fmt"
// show
func checkState(ch chan struct{}) {
select {
case <-ch:
fmt.Printf("channel is closed\n")
default:
fmt.Printf("channel is not closed\n")
}
}
// show end
func main() {
// show
ch := make(chan struct{})
checkState(ch)
close(ch)
checkState(ch)
// show end
}
此技术用于context.Done() channel