Chapter 8 Goroutines and Channels
Go enable two styles of concurrent programming. This chapter presents coroutines and channels, which support communicating sequential processes or CSP, a model of concurrency in which values are passed between independent activities (goroutines) but variables are for the the most part confined to a single activity. Chapter 9 covers some aspects of the more traditional model of shared memory multithreading, which will be familiar if you've used threads in other mainstream languages. Chapter 9 also points out some important hazards and pitfalls of concurrent programming that we won't delve into in this chapter.
goroutine 两种模式,一种用于两个 goroutine 之间的交流,variables 被限定在一个单独的 activities。
另一种类似于其他主流语言的多线程,特点是 shared memory multithreading。
8.1 Goroutines
In Go, each concurrently executing activity is called a goroutine.
golang中,每个并发执行的 activity 被称为 goroutine。
If you have used operating system threads or threads in other languages, then you can assume for now that a goroutine is similar to a thread, and you'll be able to write correct programs. The differences between threads and goroutines are essentially quantitative, not qualitative, and will be described in Section 9.8.
threads 与 goroutines 的区别是定量的,而非定性的。在9.8中讲会进行进一步的解释。
When a program starts, its only goroutine is the one that calls the main function, so we call it the main goroutine. New goroutines are created by the go statement. Syntactically, a go statement is an ordinary function or method call prefixed by the keyword go. A go statement causes the function to be called in a newly created goroutine. The go statement itself completes immediately:
当一个程序启动后,程序唯一的main function 就是 main goroutine。
- f()
- go f()
package main
import (
"time"
"fmt"
)
func main() {
go spinner(5 * time.Millisecond)
const n = 45
fibN := fib(n)
fmt.Printf("\rFibonacci(%d) = %d\n", n, fibN)
}
func spinner(delay time.Duration) {
for {
for _, r := range `-\|/` {
fmt.Printf("\r%c", r)
time.Sleep(delay)
}
}
}
func fib(x int) int {
if x < 2 {
return x
}
return fib(x - 1) + fib(x - 2)
}
程序里的 /r 表示回车
Other than by returning from main or exiting the program, there is no programmatic way for one goroutine to stop another, but as we will see later, there are ways to communicate with a goroutine to request that it stop itself.
除了从 main 函数返回 或者 退出当前的程序,没有程序上的办法能让一个 goroutine 来停止另一个 goroutine,但是有办法让一个 goroutine 向另一个 goroutine 发送消息,让他自己停止下来。
8.2 Example: Concurrent Clock Server
一个时钟例子
服务器
package main
import (
"net"
"log"
"io"
"time"
)
func main() {
listener, err := net.Listen("tcp", "localhost:8000")
if err != nil {
log.Fatal(err)
}
for {
conn, err := listener.Accept()
if err != nil {
log.Print(err)
continue
}
handleConn(conn)
}
}
func handleConn(c net.Conn) {
defer c.Close()
for {
_, err := io.WriteString(c, time.Now().Format("15:04:05\n"))
if err != nil {
return
}
time.Sleep(1 * time.Second)
}
}
重要的是三步:
- listener, err := net.Listen("tcp", "localhost:8000")
- conn, err := listener.Accept()
- _, err := io.WriteString(c, time.Now().Format("15:04:05\n"))
注释:采用的时间必须是 2016/1/2 15:04:05 (123456) 一月二号 三点四分五秒
https://segmentfault.com/q/1010000010976398/a-1020000010982052
在客户端采用 nc 命令连接
nc localhost 8000
客户端
package main
import (
"net"
"log"
"io"
"os"
)
func main() {
conn, err := net.Dial("tcp", "localhost:8000")
if err != nil {
log.Fatal(err)
}
defer conn.Close()
_, err = io.Copy(os.Stdout, conn)
if err != nil {
log.Fatal(err)
}
}
程序的关键代码片段
- 连接
conn, err := net.Dial("tcp", "localhost:8000") - 输出
_, err = io.Copy(os.Stdout, conn)
注释:
killall clock1
8.3. Example: Concurrent Echo Server
The clock server used one goroutine per connection. In this section, we'll build an echo server that uses multiple goroutines per connection.
之前 clock 的例子里,每次连接用一个 goroutine。下面的 echo server 每次连接用很多 goroutine。
reverb1
package main
import (
"net"
"log"
"time"
"fmt"
"strings"
"bufio"
)
func echo(c net.Conn, shout string, delay time.Duration) {
fmt.Fprintln(c, strings.ToUpper(shout))
time.Sleep(delay)
fmt.Fprintln(c, shout)
time.Sleep(delay)
fmt.Fprintln(c, strings.ToLower(shout))
}
func handleConn(c net.Conn) {
input := bufio.NewScanner(c)
for input.Scan() {
echo(c, input.Text(), 1 * time.Second)
}
c.Close()
}
func main() {
l, err := net.Listen("tcp", "localhost:8000")
if err != nil {
log.Fatal(err)
}
for {
conn, err := l.Accept()
if err != nil {
log.Print(err)
continue
}
handleConn(conn)
}
}
- 建立监听
l, err := net.Listen("tcp", "localhost:8000") - 建立连接
conn, err := l.Accept() - 等待读取输入
input := bufio.NewScanner(c) - 判断是否有输入
input.Scan() - 关闭 net.Conn
c.Close() - 输出到 net.Conn
fmt.Fprintln(c, shout)
待改进:
每次只能响应一个 request 的请求,就是必须先处理好上一次的请求再去处理下一次的请求。
netcat2
package main
import (
"net"
"log"
"os"
"io"
)
func main() {
conn, err := net.Dial("tcp", "localhost:8000")
if err != nil {
log.Fatal(err)
}
defer conn.Close()
go mustCopy(os.Stdout, conn)
mustCopy(conn, os.Stdin)
}
func mustCopy(dst io.Writer, src io.Reader) {
if _, err := io.Copy(dst, src); err != nil {
log.Fatal(err)
}
}
- 建立连接 Dial
conn, err := net.Dial("tcp", "localhost:8000") - 拷贝(conn io.Writer io.Reader)
copy conn io.Writer io.Reader
在 main goroutine 里用 mustCopy 函数,讲 os.Stdin 拷贝给 conn,然后用 goroutine 来处理 conn。
reverb2
package main
import (
"net"
"log"
"bufio"
"time"
"fmt"
"strings"
)
func echo(c net.Conn, shout string, delay time.Duration) {
fmt.Fprintln(c, strings.ToUpper(shout))
time.Sleep(delay)
fmt.Fprintln(c, shout)
time.Sleep(delay)
fmt.Fprintln(c, strings.ToLower(shout))
}
func handleConn(c net.Conn) {
input := bufio.NewScanner(c)
for input.Scan() {
go echo(c, input.Text(), 1 * time.Second)
}
c.Close()
}
func main() {
l, err := net.Listen("tcp", "localhost:8000")
if err != nil {
log.Fatal(err)
}
for {
conn, err := l.Accept()
if err != nil {
log.Print(err)
continue
}
go handleConn(conn)
}
}
8.4 Channels
If goroutines are the activities of a concurrent Go program, channels are the connections between them. A channel is a communication mechanism that lets one goroutine send values to another goroutine. Each channel is a conduit for values of a particular type, called the channel's element type.
channel 是一种通讯机制,允许一个 goroutine 向另一个 goroutine 发送数据。
ch := make(chan int)
As with maps, a channel is a reference to the data structure created by make. When we copy a channel or pass one as an argument to a function, we are copying a reference, so caller and callee refer to the same data structure. As with other reference types, the zero value of channel is nil.
- 与 map 类似,make 出来的 channel 是一个引用变量,但我们复制一个 channel 或者把 channel 传递给一个函数的时候,我们就是复制一个引用,函数或者复制值改变的都是底层的同一套数据。
- 与其他引用类型相似,channel 的 zero value 也是 nil
Two channels of the same type may be compared using ==. The comparison is true if both are references to the same channel data structure. A channel may also be compared to nil.
- channel 可以用 == 进行对比,只有当两个 channel 的引用代表相同的底层数据的时候,比对结果是 ture。
- channel 也可以与 nil 值进行比对
A channel has two principal operations, send and receive, collectively known as communications. A send statement transmits a value from one goroutine, through the channel, to another goroutine executing a corresponding receive expression. Both operations are written using the <- operator. In a send statement, the <- separates the channel and value operands. In a receive expression, <- precedes the channel operand. A receive expression whose result is not used is a valid statement.
ch <- x // a send statement
x = <-ch // a receive expression in an assignment statement
<-ch // a receive statement; result is discarded
ch = make(chan int) // unbuffered channel
ch = make(chan int, 0) // unbuffered channel
ch = make(chan int, 3) // bugfered channel with capacity 3
8.4.1. Unbuffered Channels
A send operation on an unbuffered channel blocks the sending goroutine executes a corresponding receive on the same channel, at which point the value is transmitted and bothe goroutines may continue. Conversely, if the receive operation was attempted first, the receiving goroutine is blocked until another goroutine performs a send on the same channel.
- 无缓冲的 channel,需要发送方和接收方共同作用才行
d
Communication over an unbuffered channel causes the sending and receiving goroutines to synchronize. Because of this, unbuffered channels are sometimes called synchronous channels. When a value is sent on an unbuffered channel, the receipt of the value happens before the reawakening of the sending goroutine.
- “happens before”
In discussions of concurrency, when we say x happens before y, we don't mean merely that x occurs earlier in time than y, we mean that it is guaranteed to do so and that all its prior effects, such as updates to variables, are complete and that you may rely on them.
- 在讨论并发性时,当我们说 x happens before y 时,我们说的是不仅仅是 x 发生于 y 之前。而是 x 发生之后所引起的各种影响。
When x neither happens before y nor after y, we say that x is concurrent with y. This doesn't mean that x and y are necessarily simultaneous, merely that we cannot assume anything about their ordering. As we'll see in the next chapter, it's necessary to order certain events during the program's execution to avoid the problems that arise when two goroutines access the same variable concurrently.
- 当 x 既不发生于 y 之前,也不发生于 y 之后,我们说 x 和 y 是并发的。这并不是说 x 和 y 是并发的,只是说,我们不能确定 x 和 y 的先后执行顺序。在下一章中将会讲述,事件之间要确定特定的执行先后顺序,才能避免一种情况,这种情况就是两个 goroutine 同时修改或读取一个共同的变量。
串联的Channels(Pipelines)
pipeline1
func main() {
naturals := make(chan int)
squares := make(chan int)
// Counter
go func() {
for x := 0; ; x++ {
naturals <- x
}
}()
// Squarer
go func() {
for {
x := <-naturals
squares <- x * x
}
}()
// Printer (in main goroutine)
for {
fmt.Println(<-squares)
}
}
- 如果发送者知道,没有更多的值需要发送到channel的话,那么让接收者也能及时知道没有多余的值可接收将是有用的,因为接收者可以停止不必要的接收等待。这可以通过内置的close函数来关闭channel实现:
package main
import (
"fmt"
"time"
)
func main() {
naturals := make(chan int)
squares := make(chan int)
go func() {
for x := 0; x < 10 ; x++ {
naturals <- x
}
close(naturals)
}()
go func() {
for {
x := <-naturals
squares <- x * x
time.Sleep(time.Second)
}
}()
for {
fmt.Println(<-squares)
}
//close(naturals)
}
- 在发送了十个自然数之后,对发送的 channel 进行了 close 处理。
After a channel has been closed, any further send operations on it will panic. After the closed channel has been drained, that is, after the last sent element has been received, all subsequent receive operations will proceed without blocking but will yield a zero value. Closing the naturals channel above would cause the squarer's loop to spin as it receives a never-ending stream of zero values, and to send these zeros to the printer.
- 当一个 channel 被关闭(close)后,再向该 channel 发送数据将导致 panic 异常。
- 当一个被关闭的 channel 中已经发送的数据都被成功接收后,后续的接收操作将不被阻塞,它们会被立即返回一个零值。
那么如何保证,当发送的 natural channel 被关闭后,停止 pipeline 的运行呢?
There is no way to test directly whether a channel has been closed, but there is a variant of the receive operation that produces two results: the received channel element, plus a boolean value, conventionally called ok, which is true for a successful receive and false for a receive on a closed and drained channel. Using this feature, we can modify the squarer's loop to stop when the naturals channel is drained and close the squares channel in turn.
- 没有办法直接测试一个channel是否被关闭
- 但是接收操作有一个变体形式:它多接收一个结果,多接收的第二个结果是一个布尔值ok,ture表示成功从channels接收到值,false表示channels已经被关闭并且里面没有值可接收。
通过这个机制,当 naturals 发送完数据后,接收端就能停止下来。
两种等价的写法
// 通过对 channel 的读取(两个值),后面一个值表示要读取的 channel 是否被关闭
go func() {
for {
x, ok := <-naturals
if !ok {
break
}
squares <- x * x
}
close(squares)
}()
// 上一种写法太笨拙了,所以采用下一种写法,这两者是等价的
go func() {
for x := range naturals {
squares <- x * x
time.Sleep(time.Second)
}
close(squares)
}()
(书本的解释)
Because the syntax above is clumsy and this pattern is common, the language lets us use a range loop to iterate over channels too. This is a more convenient syntax for receiving all the values sent on a channel and terminating the loop after the last one.
package main
import (
"fmt"
"time"
)
func main() {
naturals := make(chan int)
squares := make(chan int)
go func() {
for x := 0; x < 3; x++ {
naturals <- x
}
close(naturals)
}()
go func() {
//for x := range naturals {
// squares <- x * x
//}
for {
x := <-naturals
squares <- x * x
time.Sleep(time.Second)
}
}()
for x := range squares {
fmt.Println(x)
}
}
- 上面注释的是第二种写法
- 输出的是 0 1 4 0 0 0
- 原因:在 naturals channel 被关掉(close)以后,我们可以继续从 naturals 中读出数据,只不过这个数据为 0。
package main
import (
"fmt"
"time"
)
func main() {
naturals := make(chan int)
squares := make(chan int)
go func() {
for x := 0; x < 3; x++ {
naturals <- x
}
close(naturals)
}()
go func() {
for x := range naturals {
squares <- x * x
time.Sleep(time.Second)
}
//for {
// x := <-naturals
// squares <- x * x
// time.Sleep(time.Second)
//}
}()
for x := range squares {
fmt.Println(x)
}
}
- 输出
0
1
4
fatal error: all goroutines are asleep - deadlock! - 将 for {} 循环改成 for range 模式,for range channel 这种写法可以将 channel 里的数据都读出来,直到 channel 被关闭。所以当 naturals 被关闭以后,最后的 fmt.Println 这部分的 for range 中 squares ,没有数据可以向里面发送,产生死锁。
pipeline2
package main
import (
"time"
"fmt"
)
func main() {
naturals := make(chan int)
squares := make(chan int)
go func() {
for x := 0; x < 3; x++ {
naturals <- x
}
close(naturals)
}()
go func() {
for x := range naturals {
squares <- x * x
time.Sleep(time.Second)
}
close(squares)
}()
for x := range squares {
fmt.Println(x)
}
}
- 所以在 naturals 被读完以后,对 naturals channel 进行关闭(close)处理,整个的顺序就对了。
- 总结
for range channel 的写法,相当于每次去读取被读的 channel,直到被读的 channel 关闭为止。
Attempting to close an already-closed channel causes a panic, as does closing a nil channel. Closing channels has another use as a broadcast mechanism, which we'll cover in Section 8.9.
- 尝试关闭一个已经被被关闭的 channel,会导致 panic
- 关闭一个值为 nil 的channel,也会导致 panic
- 关闭 channel 有其他更加广泛的方法,我们将在 8.9 中讲述。
8.4.3. 单方向的Channel (Unidirectional Channel Types)
To document this intent and prevent misuse, the Go type system provides unidirectional channel types that expose only one or the other of the send and receive operations. The type chan<- int, a send-only channel of int, allows sends but not receives. Conversely, the type <-chan int, a receive-only channel of int, allows receives but not sends. (The position of the <- arrow relative to the chan keyword is a mnemonic.) violations of this discipline are detected at compile time.
pipeline3
package main
import (
"fmt"
)
func counter(out chan<- int) {
for i := 0; i < 10; i++ {
out <- i
}
close(out)
}
func squarer(out chan<- int, in <-chan int) {
for v := range in {
out <- v * v
}
close(out)
}
func printer(in <-chan int) {
for v := range in {
fmt.Println(v)
}
}
func main() {
naturals := make(chan int)
squares := make(chan int)
go counter(naturals)
go squarer(squares, naturals)
printer(squares)
}
- channel 变量,in,表示在管道里,是只读的 <-chan int 类型。
- channel 变量,out,表示在管道外面,是只写的 chan<- int 类型。
8.4.4. 带缓存的Channels (Buffered Channels)
A buffered channel has a queue of elements. The queue's maximum size is determined when it is created, by the capacity argument to make.
ch = make(chan string, 3)
d
A send operation on a buffered channel inserts an element at the back of the queue, and a receive operation removes an element from the front. If the channel is full, the send operation blocks its goroutine until space is make available by another coroutine's receive. Conversely, if the channel is empty, a receive operation blocks until a value is sent by another goroutine.
- 对带缓冲的 channel 进行 send 操作,就是往一个 queue 里塞数据,如果 channel 满了,send operation 就会阻塞 channel。
- cap 函数能返回 channel 的大小
- len 函数可以返回 channel 里面的元素数量。
- len 函数,因为在并发程序中该信息会随着接收操作而失效,但是它对某些故障诊断和性能优化会有帮助。
因此mirroredQuery函数可能在另外两个响应慢的镜像站点响应之前就返回了结果。(顺便说一下,多个goroutines并发地向同一个channel发送数据,或从同一个channel接收数据都是常见的用法。就是所谓的 finIn finOut)
func mirroredQuery() string {
responses := make(chan string, 3)
go func() { responses <- request("asia.gopl.io") }()
go func() { responses <- request("europe.gopl.io") }()
go func() { responses <- request("americas.gopl.io") }()
return <-responses // return the quickest response
}
func request(hostname string) (response string) { /* ... */ }
- 如果我们使用了无缓存的channel,那么两个慢的goroutines将会因为没有人接收而被永远卡住。这种情况,称为goroutines泄漏,这将是一个BUG。和垃圾变量不同,泄漏的goroutines并不会被自动回收,因此确保每个不再需要的goroutine能正常退出是重要的。
8.5 并发的循环 (Looping in Parallel)
In this section, we'll explore some common concurrency patterns for executing all the iterations of a loop in parallel.
我们会探索一些用来在并行时循环迭代的常见并发模型。
package thumbnail
// ImageFile reads an image from infile and writes
// a thumbnail-size version of it in the same directory.
// It returns the generated file name, e.g., "foo.thumb.jpg".
func ImageFile(infile string) (string, error)
- 上述是要用的 thubnail 库
embarrassingly parallel
Obviously the order in which we process the files doesn't matter, since each scaling operation is independent of all the others. Problems like this that consist entirely of subproblems that are completely independent of each other are described as embarrassingly parallel. Embarrassingly parallel problems are the easiest kind to implement concurrently and enjoy performance that scales linearly with the amount of parallelism.
- 易并行问题是最容易被实现成并行的一类问题, 并且最能够享受到并发带来的好处,能够随着并行的规模线性地扩展。
Let's execute all these operations in parallel, thereby hiding the latency of the file I/O and using multiple CPUs for the image-scaling computations. Our first attempt at a concurrent version just adds a go keyword. We'll ignore errors for now and address them later.
// NOTE: incorrect!
func makeThumbnails2(filenames []string) {
for _, f := range filenames {
go thumbnail.ImageFile(f) // NOTE: ignoring errors
}
}
- 这个版本运行得贼快,还没等 goroutine 返回,main goroutine 就结束了。
- 没有什么直接的办法能够等待goroutine完成,但是我们可以改变goroutine里的代码让其能够将完成情况报告给外部的goroutine知晓,使用的方式是向一个共享的channel中发送事件。
package main
import (
"fmt"
"time"
)
func main() {
s := []int{6, 3, 9, 7, 10, 2, 5, 11, 35}
fmt.Println(s)
for _, e := range s {
go func() {
fmt.Println(e)
}()
}
time.Sleep(time.Second * 5)
}
输出
5
5
5
5
11
35
35
35
35
package main
import (
"fmt"
"time"
)
func main() {
s := []int{6, 3, 9, 7, 10, 2, 5, 11, 35}
fmt.Println(s)
for _, e := range s {
go func(e int) {
fmt.Printf("%d ", e)
}(e)
}
time.Sleep(time.Second * 5)
}
输出
[6 3 9 7 10 2 5 11 35]
6 3 5 11 2 9 35 7 10
- 第一本版本中的 e 会被所有的匿名函数所共享,所以这里打印的结果应该是 35。
- 但是实际上不是,因为 goroutine 是并发的,在最后一个 for range 之前,部分 goroutine 就执行了 fmt.Println(e)
- 第二个版本,把 e 变量当成匿名函数的参数传递进去了,所以就能把 slice 里所有的值都打印出来了。
func makeThumbnails4(filenames []string) error {
errors := make(chan error)
for _, f := range filenames {
go func(f string) {
_, err := thumbnail.ImageFile(f)
errors <- err
}(f)
}
for range filenames {
if err := <-errors; err != nil {
return err // NOTE: incorrect: goroutine leak!
}
}
return nil
}
- 这个程序有一个微妙的bug。当它遇到第一个非nil的error时会直接将error返回到调用方,使得没有一个goroutine去排空errors channel。这样剩下的worker goroutine在向这个channel中发送值时,都会永远地阻塞下去,并且永远都不会退出。这种情况叫做goroutine泄露(§8.4.4),可能会导致整个程序卡住或者跑出out of memory的错误。
- 解决方案:
- 采用缓冲 channel
- 一个可选的解决办法是创建一个另外的goroutine,当main goroutine返回第一个错误的同时去排空channel+
// 使用 buffered channel 来解决问题
// makeThumbnails5 makes thumbnails for the specified files in parallel.
// It returns the generated file names in an arbitrary order,
// or an error if any step failed.
func makeThumbnails5(filenames []string) (thumbfiles []string, err error) {
type item struct {
thumbfile string
err error
}
ch := make(chan item, len(filenames))
for _, f := range filenames {
go func(f string) {
var it item
it.thumbfile, it.err = thumbnail.ImageFile(f)
ch <- it
}(f)
}
for range filenames {
it := <-ch
if it.err != nil {
return nil, it.err
}
thumbfiles = append(thumbfiles, it.thumbfile)
}
return thumbfiles, nil
}
// makeThumbnails6 makes thumbnails for each file received from the channel.
// It returns the number of bytes occupied by the files it creates.
func makeThumbnails6(filenames <-chan string) int64 {
sizes := make(chan int64)
var wg sync.WaitGroup // number of working goroutines
for f := range filenames {
wg.Add(1)
// worker
go func(f string) {
defer wg.Done()
thumb, err := thumbnail.ImageFile(f)
if err != nil {
log.Println(err)
return
}
info, _ := os.Stat(thumb) // OK to ignore error
sizes <- info.Size()
}(f)
}
// closer
go func() {
wg.Wait()
close(sizes)
}()
var total int64
for size := range sizes {
total += size
}
return total
}
- wg.Add(1) 是对 wg 变量进行 加1 操作
- wg.Done() 是对 wg 变量进行 减1操作
- 怎样创建一个closer goroutine, 并让其在所有worker goroutine们结束之后再关闭sizes channel的。两步操作:wait和close,必须是基于sizes的循环的并发。考虑一下另一种方案:如果等待操作被放在了main goroutine中,在循环之前,这样的话就永远都不会结束了,如果在循环之后,那么又变成了不可达的部分,因为没有任何东西去关闭这个channel,这个循环就永远都不会终止。
8.6.示例: 并发的Web爬虫
func crawl(url string) []string {
fmt.Println(url)
list, err := links.Extract(url)
if err != nil {
log.Print(err)
}
return list
}
//!-crawl
//!+main
func main() {
worklist := make(chan []string)
// Start with the command-line arguments.
go func() { worklist <- os.Args[1:] }()
// Crawl the web concurrently.
seen := make(map[string]bool)
for list := range worklist {
for _, link := range list {
if !seen[link] {
seen[link] = true
go func(link string) {
worklist <- crawl(link)
}(link)
}
}
}
}
The main function resembles breadthFirst. As before, a work list records the queue of times that need processing, each item being a list of URLs to crawl, but this time, instead of representing the queue using a slice, we use a channel. Each call to crawl occurs in its own goroutine and sends the links
- 主函数和5.6节中的breadthFirst(广度优先)类似。像之前一样,一个worklist是一个记录了需要处理的元素的队列,每一个元素都是一个需要抓取的URL列表,不过这一次我们用channel代替slice来做这个队列。
- 注意这里的crawl所在的goroutine会将link作为一个显式的参数传入,来避免“循环变量快照”的问题。
The program is too parallel. Unbounded parallelism is rarely a good idea since there is always a limiting factor in the system, such as the number of CPU cores for compute-bound workloads, the number of spindles and heads for local disk I/O operations, the bandwidth of the network for streaming downloads, or the serving capacity of a web service. ... A simple way to do that in our example is to ensure that no more than n calls to links.Extract are active at once, where n is comfortably less than the file descriptor limit ---- 20, say. This is analogous to the way a doorman at a crowded nightclub admits a guest only when some other guest leaves.
- 无穷无尽地并行化并不是什么好事情,因为不管怎么说,你的系统总是会有一些个限制因素
- 为了解决这个问题,我们可以限制并发程序所使用的资源来使之适应自己的运行环境。
We can limit parallelism using a buffered channel of capacity n to model a concurrency primitive called a counting semaphore. Conceptually, each of the n vacant slots in the channel buffer represents a token entitling the holder to proceed. Sending a value into the channel acquires a token, and receiving a value from the channel releases a token, creating a new vacant slot. This ensures that at most n sends can occur without an intervening receive. ( Although it might be more intuitive to treat filled slots in the channel buffer as tokens, using vacant slots avoids the need to fill the channel buffer after creating it.) Since the cahnnel element type is not important, we'll use struct{}, which has size zero.
- 可以用缓冲 channel 来解决,(counting semaphore),有点类似于操作系统里的信号量。
crawl2
package main
import (
"fmt"
"log"
"os"
"gopl.io/ch5/links"
)
//!+sema
// tokens is a counting semaphore used to
// enforce a limit of 20 concurrent requests.
var tokens = make(chan struct{}, 20)
func crawl(url string) []string {
fmt.Println(url)
tokens <- struct{}{} // acquire a token
list, err := links.Extract(url)
<-tokens // release the token
if err != nil {
log.Print(err)
}
return list
}
//!-sema
//!+
func main() {
worklist := make(chan []string)
var n int // number of pending sends to worklist
// Start with the command-line arguments.
n++
go func() { worklist <- os.Args[1:] }()
// Crawl the web concurrently.
seen := make(map[string]bool)
for ; n > 0; n-- {
list := <-worklist
for _, link := range list {
if !seen[link] {
seen[link] = true
n++
go func(link string) {
worklist <- crawl(link)
}(link)
}
}
}
}
- crawl 函数里的 tokens 就保证了只有 20 个并发。
- 通过 n 变量来解决了上一个版本中无限循环的问题
func main() {
worklist := make(chan []string) // lists of URLs, may have duplicates
unseenLinks := make(chan string) // de-duplicated URLs
// Add command-line arguments to worklist.
go func() { worklist <- os.Args[1:] }()
// Create 20 crawler goroutines to fetch each unseen link.
for i := 0; i < 20; i++ {
go func() {
for link := range unseenLinks {
foundLinks := crawl(link)
go func() { worklist <- foundLinks }()
}
}()
}
// The main goroutine de-duplicates worklist items
// and sends the unseen ones to the crawlers.
seen := make(map[string]bool)
for list := range worklist {
for _, link := range list {
if !seen[link] {
seen[link] = true
unseenLinks <- link
}
}
}
}
- 这个版本使用了原来的crawl函数,但没有使用计数信号量,取而代之用了20个常驻的crawler goroutine,这样来保证最多20个HTTP请求在并发。
- 前一个 for 循环,一共生成 20 个 goroutine 来处理数据。
- 后面一个 for 循环,把没有爬过的 url 通过channel 推送到那20个 goroutine 里面去。
8.7.基于select的多路复用
countdown1
func main() {
fmt.Println("Commencing countdown.")
tick := time.Tick(1 * time.Second)
for countdown := 10; countdown > 0; countdown-- {
fmt.Println(countdown)
<-tick
}
launch()
}
- time.Tick函数返回一个channel,程序会周期性地像一个节拍器一样向这个channel发送事件。
countdown2
package main
import (
"fmt"
"os"
"time"
)
func launch() {
fmt.Println("Lift off!")
}
func main() {
abort := make(chan struct{})
go func() {
os.Stdin.Read(make([]byte, 1))
abort <- struct{}{}
}()
fmt.Println("Commencing countdown. Press return to abort.")
select {
case <-time.After(10 * time.Second):
fmt.Println("10s has passed.")
case <-abort:
fmt.Println("Launch aborted!")
return
}
launch()
}
- 每一个case代表一个通信操作(在某个channel上进行发送或者接收),并且会包含一些语句组成的一个语句块。
- 一个接收表达式可能只包含接收表达式自身(译注:不把接收到的值赋值给变量什么的)
- 一个没有任何case的select语句写作select{},会永远地等待下去。
package main
import (
"fmt"
"time"
)
func main() {
ch := make(chan int, 1)
for i := 0; i < 10; i++ {
select {
case x := <-ch:
fmt.Println(x)
time.Sleep(1 * time.Second)
case ch <- i:
}
}
}
- ch这个channel的buffer大小是1,所以会交替的为空或为满,所以只有一个case可以进行下去,无论i是奇数或者偶数,它都会打印0 2 4 6 8。
package main
import (
"os"
"fmt"
"time"
)
func main() {
abort := make(chan struct{})
go func() {
os.Stdin.Read(make([]byte, 1))
abort <- struct{}{}
}()
fmt.Println("Commencing countdown, Press return")
tick := time.Tick(1 * time.Second)
for countdown := 10; countdown >0; countdown-- {
fmt.Println(countdown)
select {
case <-tick:
fmt.Println("tick")
case <-abort:
fmt.Println("Launch aborted!")
return
}
}
launch()
}
func launch() {
fmt.Println("Lift off!")
}
Sometimes we want to try to send or receive on a channel but avoid blocking if the channel is not ready -- a non-blocking communication. A select statement can do that too. A select may have a default, which specifies what to do when none of the other communications can proceed immediately.
- default 的目的是防止程序阻塞在 select 处。
select {
case <-abort:
fmt.Printf("Launch aborted!\n")
return
default:
// do nothing
}
d
The select statement below receives a value from the abort channel if there is one to receive; otherwise it does nothing. This is a non-blocking receive operration; doing it repeatedly is called polling a channel. (轮询 channel)
- 下面的select语句会在abort channel中有值时,从其中接收值;无值时什么都不做。这是一个非阻塞的接收操作;反复地做这样的操作叫做“轮询channel”。
The zero value for a channel is nil. Perhaps surprisingly, nil channels are sometimes useful. Because send and receive operations on a nil channel block forever, a case in a select statement whose channel is nil is never selected. This lets us use nil to enable or disable cases that correspond to features like handling timeouts or cancellation, responding to other input events, or emitting output. We'll seen an example in the next section.
- nil的channel有时候也是有一些用处的。因为对一个nil的channel发送和接收操作会永远阻塞,在select语句中操作nil的channel永远都不会被select到。
- 这使得我们可以用nil来激活或者禁用case,来达成处理其它输入或输出事件时超时和取消的逻辑。我们会在下一节中看到一个例子。
8.7. Example: Concurrent directory Traversal
du1
package main
import (
"flag"
"fmt"
"os"
"path/filepath"
"io/ioutil"
)
func main() {
flag.Parse()
roots := flag.Args()
if len(roots) == 0 {
roots = []string{"."}
}
fileSizes := make(chan int64)
go func() {
for _, root := range roots {
walkDir(root, fileSizes)
}
close(fileSizes)
}()
var nfiles, nbytes int64
for size := range fileSizes {
nfiles++
nbytes += size
}
printDiskUsage(nfiles, nbytes)
}
func printDiskUsage(nfiles, nbytes int64) {
fmt.Printf("%d files %.1f GB\n", nfiles, float64(nbytes)/1e9)
}
func walkDir(dir string, fileSizes chan<- int64) {
for _, entry := range dirents(dir) {
if entry.IsDir() {
subdir := filepath.Join(dir, entry.Name())
walkDir(subdir, fileSizes)
} else {
fileSizes <- entry.Size()
}
}
}
func dirents(dir string) []os.FileInfo {
entries, err := ioutil.ReadDir(dir)
if err != nil {
fmt.Fprintf(os.Stderr, "du1: %v\n", err)
return nil
}
return entries
}
- ioutil.ReadDir(dir)(也就是 dirents)这个函数,返回的 entries 是一个slice,包含该目录下所有的文件和文件夹的参数。
- walkDir 是一个递归的结构,里面是一个 if else 的判断语句,如果是 entry 是目录,下面就用walkDir 进行递归
The program would be nicer if it kept us informed of its progress. However, simply moving the printDiskUsage call into the loop would cause it to print thousands of lines of output.
The variant of du below prints the totals periodically, but only if the -v flag is specified since not all users will want to see progress messages. The background goroutine that loops over roots remains unchanged. The main goroutine now uses a ticker to generate events every 500ms, and a select statement to wait for either a file size message, in which case it updates the totals, or a tick event, in which case it prints the current totals. If the -v flag is not specified, the tick channel remains nil, and its case in the select is effectively disabled.
- -v flag 如果没有指定,tick channel 就是置 nil,在后续的 select 语句中就会被disabled,这就是 nil channel 的妙用。
- 程序如果能周期性的通知运行的进展就好了,但是简单的将 printDiskUsage 移动到循环里,会打印出上千条消息。
du2
package main
// The du2 variant uses select and a time.Ticker
// to print the totals periodically if -v is set.
import (
"flag"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"time"
)
//!+
var verbose = flag.Bool("v", false, "show verbose progress messages")
func main() {
// ...start background goroutine...
//!-
// Determine the initial directories.
flag.Parse()
roots := flag.Args()
if len(roots) == 0 {
roots = []string{"."}
}
// Traverse the file tree.
fileSizes := make(chan int64)
go func() {
for _, root := range roots {
walkDir(root, fileSizes)
}
close(fileSizes)
}()
//!+
// Print the results periodically.
var tick <-chan time.Time
if *verbose {
tick = time.Tick(500 * time.Millisecond)
}
var nfiles, nbytes int64
loop:
for {
select {
case size, ok := <-fileSizes:
if !ok {
break loop // fileSizes was closed
}
nfiles++
nbytes += size
case <-tick:
printDiskUsage(nfiles, nbytes)
}
}
printDiskUsage(nfiles, nbytes) // final totals
}
//!-
func printDiskUsage(nfiles, nbytes int64) {
fmt.Printf("%d files %.1f GB\n", nfiles, float64(nbytes)/1e9)
}
// walkDir recursively walks the file tree rooted at dir
// and sends the size of each found file on fileSizes.
func walkDir(dir string, fileSizes chan<- int64) {
for _, entry := range dirents(dir) {
if entry.IsDir() {
subdir := filepath.Join(dir, entry.Name())
walkDir(subdir, fileSizes)
} else {
fileSizes <- entry.Size()
}
}
}
// dirents returns the entries of directory dir.
func dirents(dir string) []os.FileInfo {
entries, err := ioutil.ReadDir(dir)
if err != nil {
fmt.Fprintf(os.Stderr, "du: %v\n", err)
return nil
}
return entries
}
Since the program no longer uses a range loop,the first select case must explicitly test whether the fileSizes channel has been closed, using the two-result form of receive operation. If the channel has been closed, the program breaks out of the loop. The labeled break statement breaks out of both the select and the for loop; an unlabeled break would break out of only the select, causing the loop to begin the next iteration.
- 这个版本的 fileSizes channel,不能再用 range 语法来写了,要用到 size,ok 语法来写。
- 然而,这样子的做法依然需要很久的时间来完成。
However, it still takes too long to finish. There's no reason why all the calls to walkDir can't be done concurrently, thereby exploiting parallelism in the disk system. The third version of du, below, creates a new goroutine for each call to walkDir. It uses a sync.WaitGroup to count the number of calls to walkDir that are still active, and a closer goroutine to close the fileSizes channel when the counter drops to zero.
golang 中的 sync.WaitGroup
先说说WaitGroup的用途:它能够一直等到所有的goroutine执行完成,并且阻塞主线程的执行,直到所有的goroutine执行完成。
package main
import (
"time"
"fmt"
"sync"
)
func main() {
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
wg.Add(1)
fmt.Println(i)
go func(n int) {
defer wg.Add(-1)
number(n)
}(i)
}
wg.Wait()
}
func number(i int) {
time.Sleep(time.Second)
fmt.Println(i)
}
- golang中的同步是通过sync.WaitGroup来实现的.WaitGroup的功能:它实现了一个类似队列的结构,可以一直向队列中添加任务,当任务完成后便从队列中删除,如果队列中的任务没有完全完成,可以通过Wait()函数来出发阻塞,防止程序继续进行,直到所有的队列任务都完成为止.
- sync.WaitGroup 的缺点是无法指定固定的 goroutine 数目。
package main
import (
"flag"
"sync"
"time"
"fmt"
"path/filepath"
"os"
"io/ioutil"
)
var vFlag = flag.Bool("v", false, "show verbose progress messages")
func main() {
flag.Parse()
roots := flag.Args()
if len(roots) == 0 {
roots = []string{"."}
}
fileSizes := make(chan int64)
var n sync.WaitGroup
for _, root := range roots {
n.Add(1)
go walkDir(root, &n, fileSizes)
}
go func() {
n.Wait()
close(fileSizes)
}()
var tick <-chan time.Time
if *vFlag {
tick = time.Tick(500 * time.Millisecond)
}
var nfiles, nbytes int64
loop:
for {
select {
case size, ok := <-fileSizes:
if !ok {
break loop
}
nfiles++
nbytes += size
case <-tick:
printDiskUsage(nfiles, nbytes)
}
}
}
func printDiskUsage(nfiles, nbytes int64) {
fmt.Printf("%d files %.1f GB\n", nfiles, float64(nbytes)/1e9)
}
func walkDir(dir string, n *sync.WaitGroup, fileSizes chan<- int64) {
defer n.Done()
for _, entry := range dirents(dir) {
if entry.IsDir() {
n.Add(1)
subdir := filepath.Join(dir, entry.Name())
go walkDir(subdir, n, fileSizes)
} else {
fileSizes <- entry.Size()
}
}
}
var sema = make(chan struct{}, 20)
func dirents(dir string) []os.FileInfo {
sema <- struct{}{}
defer func() { <-sema }()
entries, err := ioutil.ReadDir(dir)
if err != nil {
fmt.Fprintf(os.Stderr, "du: %v\n", err)
return nil
}
return entries
}
Since this program creates many thousands of goroutines at its peak, we have to change directs to use a counting semaphore to prevent it from opening too many files at once, just as we did for the web crawler in Section 8.6.
- 因为这个版本创造了数以千计的 goroutines,所以我们必须设定一个值,来防止并发的 goroutine 太多,这一点是在 dirents 里实现的。
This version runs several time faster than the previous one, though there is a lot of variability from system to system.
Cancellation
Sometimes we need to instruct a goroutine to stop what it is doing, for example, in a web server performing a computation on behalf of a client that has disconnected.
有时候我们需要通知goroutine停止它正在干的事情,比如一个正在执行计算的web服务,然而它的客户端已经断开了和服务端的连接。
There is no way for one goroutine to terminate another directly, since that would leave all its shared variables in undefined states. In the rocket launch program we sent a single value on a channel named abort, which the countdown goroutine interpreted as a request to stop itself. But why if we need to cancel two goroutines, or an arbitrary number?
Go语言并没有提供在一个goroutine中终止另一个goroutine的方法,由于这样会导致goroutine之间的共享变量落在未定义的状态上。在8.7节中的rocket launch程序中,我们往名字叫abort的channel里发送了一个简单的值,在countdown的goroutine中会把这个值理解为自己的退出信号。但是如果我们想要退出两个或者任意多个goroutine怎么办呢?
Recall that after a channel has been closed and drained of all sent values, subsequent receive operations proceed immediately, yielding zero values. We can exploit this to create a broadcast mechanism: don't send a value on the channel, close it.
回忆一下我们关闭了一个channel并且被消费掉了所有已发送的值,操作channel之后的代码可以立即被执行,并且会产生零值。我们可以将这个机制扩展一下,来作为我们的广播机制:不要向channel发送值,而是用关闭一个channel来进行广播。
package main
import (
"os"
"fmt"
"time"
)
var done = make(chan struct{})
func cancelled() bool {
select {
case <-done:
return true
default:
return false
}
}
func main() {
go func() {
os.Stdin.Read(make([]byte, 1))
close(done)
}()
for i := 0; ;i++ {
if cancelled() {
return
}
fmt.Println(i)
time.Sleep(time.Second)
}
}
- 这里利用channel被关闭后,继续从channel里读取数据时,channel里会返回 0的特性
- Next,we create a goroutine that will read from the standard input, which is typically connected to the terminal. As soon as any input is read(for instance, the user presses the return key), this goroutine broadcasts the cancellation by closing the done channel.
下面我们创建一个从标准输入流中读取内容的goroutine,这是一个比较典型的连接到终端的程序。每当有输入被读到(比如用户按了回车键),这个goroutine就会把取消消息通过关闭done的channel广播出去。
package main
import (
"os"
"sync"
"time"
"path/filepath"
"fmt"
)
var done = make(chan struct{})
func cancelled() bool {
select {
case <- done:
return true
default:
return false
}
}
func main() {
roots := os.Args[1:]
if len(roots) == 0 {
roots = []string{"."}
}
go func() {
os.Stdin.Read(make([]byte, 1))
close(done)
}()
fileSizes := make(chan int64)
var n sync.WaitGroup
for _, root := range roots {
n.Add(1)
go walkDir(root, &n, fileSizes)
}
go func() {
n.Wait()
close(fileSizes)
}()
tick := time.Tick(500 * time.Millisecond)
var nfiles, nbytes int64
loop:
for {
select {
case <-done:
for range fileSizes {
}
return
case size, ok := <-fileSizes:
if !ok {
break loop
}
nfiles++
nbytes += size
case <-tick:
printDiskUsage(nfiles, nbytes)
}
}
printDiskUsage(nfiles, nbytes)
}
func printDiskUsage(nfiles, nbytes int64) {
fmt.Printf("%d files %.1f GB\n", nfiles, float64(nbytes)/1e9)
}
func walkDir(dir string, n *sync.WaitGroup, fileSizes chan<- int64) {
defer n.Done()
if cancelled() {
return
}
for _, entry := range dirents(dir) {
if entry.IsDir() {
n.Add(1)
subdir := filepath.Join(dir, entry.Name())
go walkDir(subdir, n, fileSizes)
} else {
fileSizes <- entry.Size()
}
}
}
var sema = make(chan struct{}, 20)
func dirents(dir string) []os.FileInfo {
select {
case sema <- struct{}{}:
case <-done:
return nil
}
defer func() { <-sema }()
f, err := os.Open(dir)
if err != nil {
fmt.Fprintf(os.Stderr, "du: %v\n", err)
return nil
}
defer f.Close()
entries, err := f.Readdir(0)
if err != nil {
fmt.Fprintf(os.Stderr, "du: %v\n", err)
}
return entries
}
Now we need to make our goroutines respond to the cancellation. In the main goroutine, we add a third case to the select statement that tries to receive from the done channel. The function returns if this case is ever selected, but before it returns it must first drain the fileSizes channel, discarding all values until the channel is
- 在main goroutine中,我们添加了select的第三个case语句,尝试从done channel中接收内容。如果这个case被满足的话,在select到的时候即会返回,但在结束之前我们需要把fileSizes channel中的内容“排”空,在channel被关闭之前,舍弃掉所有值。这样可以保证对walkDir的调用不要被向fileSizes发送信息阻塞住,可以正确地完成。
It might be profitable to poll the cancellation status again within walkDir's loop, to avoid creating goroutines after the cancellation event. Cancellation involves a trade-off; a quicker response often requires more intrusive changes to program logic. Ensuring that no expensive operations ever occur after the cancellation event may require updating many places in your code, but often most of the benefit can be obtained by checking for cancellation in a few import places.
A little profiling of this program revealed that the bottleneck was the acquisition of a semaphore token in directs. The select below makes this operation cancellable and reduces the typical cancellation latency of the program from hundreds of milliseconds to tens:
func dirents(dir string) []os.FileInfo {
select {
case sema <- struct{}{}: // acquire token
case <-done:
return nil // cancelled
}
defer func() { <-sema }() // release token
// ...read directory...
}
Now, when cancellation occurs, all the background goroutines quickly stop and the main function returns, Of course, when main returns, a program exits, so it can be hard to tell a main function that cleans up after itself from one that does not. There's a handy trick we can use during testing: if instead of returning from main in the event of cancellation, we execute a call to panic, then the runtime will dump the stack of every goroutine in the program. If the main goroutine is the only one left, then it has cleaned up after itself. But if other goroutines remain, they may not have been properly cancelled, or perhaps they have been cancelled but the cancellation takes time; a little investigation may be worthwhile. The panic dup often contains sufficient information to distinguish these cases.
聊天服务(chat Server)
We'll finish this chapter with a chat server that lets several users broadcast textual messages to each other. There are four kinds of goroutine in this program. There is one instance apiece of the main and broadcaster goroutines, and for each client connection there is one handleConn and one clientWriter goroutine. The broadcaster is a good illustration of how select is used, since it has to respond to three different kinds of messages.
The job of the main goroutine, shown below, is to listen for and accept incoming network connections from clients. For each one, it creates a new handleConn goroutine, just as in the concurrent echo server we saw at the start of this chapter.
func main() {
listener, err := net.Listen("tcp", "localhost:8000")
if err != nil {
log.Fatal(err)
}
go broadcaster()
for {
conn, err := listener.Accept()
if err != nil {
log.Print(err)
continue
}
go handleConn(conn)
}
}
Next is the broadcaster. Its local variable clients records the current set of connected clients. The only information recorded about each client is the identity of its outgoing message channel, about which more later.
然后是broadcaster的goroutine。他的内部变量clients会记录当前建立连接的客户端集合。其记录的内容是每一个客户端的消息发出channel的"资格"信息。
type client chan<- string
var (
entering = make(chan client)
leaving = make(chan client)
messages = make(chan string)
)
func broadcaster() {
clients := make(map[client]bool)
for {
select {
case msg := <-messages:
for cli := range clients {
cli <- msg
}
case cli := <-entering:
clients[cli] = true
case cli := <-leaving:
delete(clients, cli)
close(cli)
}
}
}
The broadcaster listens on the global entering and leaving channels for announcements of arriving and departing clients. When it receives one of these events, it updates the clients set, and if the event was a departure, it closes the client's outgoing message channel. The broadcaster also listens for events on the global messages channel, to which each client sends all its incoming messages. When the broadcaster receives one of these events, it broadcasts the message to every connected client.
- broadcaster监听来自全局的entering和leaving的channel来获知客户端的到来和离开事件。当其接收到其中的一个事件时,会更新clients集合,当该事件是离开行为时,它会关闭客户端的消息发送channel。broadcaster也会监听全局的消息channel,所有的客户端都会向这个channel中发送消息。当broadcaster接收到什么消息时,就会将其广播至所有连接到服务端的客户端。
Now let's look as the per-client goroutines. The handleConn function creates a new outgoing message channel for its client and announces the arrival of the client of this client to the broadcaster over the entering channel. Then it reads every line of text from the client, sending each line to the broadcaster over the global incoming message channel, prefixing each message with the identity of its sender. Once there is nothing more to read from the client, handleConn announces the departure of the client over the leaving channel and closes the connection.
//定义只读的channel
read_only := make (<-chan int)
//定义只写的channel
write_only := make (chan<- int)
//可同时读写
read_write := make (chan int)
func handleConn(conn net.Conn) {
ch := make(chan string) // outgoing client messages
go clientWriter(conn, ch)
who := conn.RemoteAddr().String()
ch <- "You are " + who
messages <- who + " has arrived"
entering <- ch
input := bufio.NewScanner(conn)
for input.Scan() {
messages <- who + ": " + input.Text()
}
// NOTE: ignoring potential errors from input.Err()
leaving <- ch
messages <- who + " has left"
conn.Close()
}
func clientWriter(conn net.Conn, ch <-chan string) {
for msg := range ch {
fmt.Fprintln(conn, msg) // NOTE: ignoring network errors
}
}