Channel 是什么?
channel,通道,本质上是一个通信对象,goroutine 之间可以使用它来通信。从技术上讲,通道是一个数据传输管道,可以向通道写入或从中读取数据。
定义一个channel
Go 规定: 使用
chan
关键字来创建通道,使用make
关键字初始化通道,一个通道只能传输一种类型的数据。
上面的程序定义一个通道c
变量,它可以传输类型为int的数据。上面的程序打印 <nil>, 是因为通道的零值是 nil。但是 nil 通道不能传输数据。因此,我们必须使用 make 函数来创建一个可用的通道。
我们使用简写语法 :=
和 make
函数创建通道。上述程序产生以下结果:
type of `c` is chan int
value of `c` is 0xc0420160c0
注意通道的值 c
,看起来它是一个内存地址。
通道是指针类型。某些场景下,goroutine 之间通信时,会将通道作为参数传递给函数或方法,当程序接收该通道作为参数时,无需取消引用它即可从该通道推送或拉取数据。
通道数据读写
Go规定:使用左箭头语法
<-
从通道读取和写入数据。
- 写入数据
c <- data
上面的示例表示:将data发送到通道 c 中。看箭头的方向,它从data指向c, 因此可以想象”我们正在尝试将data推送到 c”。
- 读取数据
<- c
上述示例表示:从通道 c 读取数据。看箭头方向,从 c 通道开始。
该语句不会将数据推存到任何内容中,但它仍然是一个有效的语句。
如果您有一个变量data,可以保存来自通道的数据,则可以使用以下语法:
var data int
data = <- c
现在来自通道 c 的 int 类型的数据可以存储到变量data中, data 必须也是int类型。
上面的语法可以使用简写语法重写,如下所示
data := <- c
golang 自动识别c中的数据类型,并将data设置为同样的类型
以上所有通道操作都是阻塞的
在上一课中,我们使用 time.Sleep 阻塞了goroutine,从而调度了其它的goroutine。 而通道操作本质上也是阻塞的,当向通道写入数据时,当前goroutine 会被阻塞,直到其它goroutine 从该通道读取数据。我们在并发章节中看到,当前的goroutine阻塞后, 调度器会调度其它空闲的goroutine继续工作,从而保证程序不会永远阻塞,这是用channel来做的。通道的这个特性在 goroutines 通信中非常有用,因为它可以防止我们编写手动锁和 hack 来使它们彼此协同工作。
channel 实践
下面我们一步一步的讲讲解上面程序的执行过程:
- 首先声明了greet函数,它接受字符串类型的通道c。greeter这个函数从通道 c 读取数据,并打印到控制台。
- 在 main 函数中,第一条语句: 打印 "main started" 到控制台
- main函数第二条语句:使用 make 初始化字符串类型的通道 c
- main 函数第三条语句中: 将通道c 传递给 greet 函数,但使用 go 关键字将其作为 goroutine 执行。
- 此时,进程有 2 个 goroutine,而活动goroutine 是
main goroutine
(查看上一课就知道它是什么)。然后控制权转到下一行代码。 - main 第四行语句:将字符串值 "John" 发送到通道 c。此时,goroutine 被阻塞,直到某个 goroutine 读取它。 Go调度器调度
greet goroutine
,它按照第一点中提到的那样执行。 - 然后
main goroutine
激活并执行最后的语句,打印 "main stopped"。
deadlock
前面我们说了“通道本质上是阻塞的”,当在通道写入或读取数据时,该 goroutine 会阻塞,而且会一直阻塞,控制权被传递给其他可用的 goroutine。如果没有其他可用的 goroutine 怎么办,程序无法向下执行了,这就产生死锁错误,导致整个程序崩溃。
当尝试从某个通道读取数据,但通道没有可用的值时,会期望其它 goroutine 推送值到该通道, 而阻塞当前goroutine,交出控制权,因此,此读取操作将是阻塞的。同样,如果要向通道发送数据,会交出控制权到其它goroutine,直到某个 goroutine 从中读取数据。因此,此发送操作将被阻塞。
死锁的一个简单示例是只有主 goroutine 执行一些通道操作。
上面的程序将在运行时死锁了, 抛出以下错误:
main() started
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [chan send]:
main.main()
program.Go:10 +0xfd
exit status 2
fatal error: all goroutines are asleep - deadlock!
。似乎所有 goroutine 都处于睡眠状态,或者根本没有其他 goroutine 可用于调度。
关闭channel
一个通道可以关闭,这样就不能再通过它发送数据了。接收器 goroutine 可以使用 val, ok := <- c
语法找出通道的状态,如果通道打开或可以执行读取操作,则 ok 为真,如果通道关闭且无法执行更多读取操作,则为 false 。
可以使用
close(channel)
的close
内置函数关闭通道。
让我们看一个简单的例子。
只是为了帮助你理解阻塞的概念,首先发送操作
c <- "John"
是阻塞的,一些 goroutine 必须从通道读取数据,因此greet goroutine
是由 Go调度器调度的。然后第一次读取操作<-c
是非阻塞的,因为数据存在于通道 c 中以供读取。第二次读取操作<-c
将被阻塞,因为通道 c 没有任何数据可供读取,因此 Go 调度器激活main goroutine
并且程序从close(c)
函数开始执行。
从上面的错误中,是由尝试在关闭的通道上发送数据引起的。为了更好地理解关闭通道的可用性,让我们看看 for 循环。
for loop
for{} 的无限循环语法可用于读取通过通道发送的多个值。
在上面的例子中,我们正在创建 goroutine squares,它一个一个地返回从 0 到 9 的数字的平方。在main goroutine 中,我们正在无限循环中读取这些数字。
在无限 for 循环中,由于我们需要一个条件来在某个时刻中断循环,因此我们使用语法 val, ok := <-c
从通道读取值。在这里,当通道关闭时,ok 会给我们额外的信息。因此,在 squares 协程中,在写完所有数据后,我们使用语法 close(c)
关闭通道。当 ok 为真时,程序打印 val 中的值和通道状态 ok。当它为假时,我们使用 break 关键字跳出循环。因此,上述程序产生以下结果:
main() started
0 true
1 true
4 true
9 true
16 true
25 true
36 true
49 true
64 true
81 true
0 false <-- loop broke!
main() stopped
当通道关闭时,goroutine 读取的值为通道数据类型的零值。在这种情况下,由于 channel 正在传输 int 数据类型,因此可以从结果中看到它是 0。与向通道读取或写入值不同,关闭通道不会阻塞当前的 goroutine。
为了避免手动检查通道关闭条件的痛苦,Go 提供了更简单的
for range
循环,它会在通道关闭时自动关闭。
让我们修改我们之前的上述程序。
在上面的程序中,我们使用了 for val := range c
而不是 for{}
。 range 将一次从通道读取一个值,直到它关闭。因此,上述程序产生以下结果
main() started
0
1
4
9
16
25
36
49
64
81
main() stopped
如果不关闭 for range 循环中的通道,程序将在运行时抛出死锁致命错误。所以,数据发送完成时,要记得关闭通道;还可以使用select,default来避免这个问题。
缓冲区通道和通道容量
正如我们所见,每一个通道写入操作都会阻塞当前goroutine。但是到目前位置,在创建channel时,我们都没有给make第二个参数。这第二个参数就是通道容量,或缓冲区。默认为0的通道成为无缓冲区通道。
当通道的缓冲区大于0时,缓冲区被填满之前, goroutine不会被阻塞。缓冲区满后, 只有当通道的最后数据被读取,新的值才能被添加。有一个问题是通道读取是饥渴操作,这表示读取一旦开始,只要缓冲区有数据, 读取就会一直进行。技术上来讲,缓冲区为空时,读取操作才是阻塞的。
我们使用下面的语法定义带缓冲的通道:
c := make(chan Type, n)
上面会创建一个数据类型为 Type 且缓冲区大小为 n 的通道。直到 channel 收到 n+1 个发送操作,它才会阻塞当前的 goroutine。
在上面的程序中,通道 c 的缓冲区容量为 3。这意味着它可以容纳 3 个值,在第20行,由于缓冲区没有溢出(因为我们没有推送任何新值),main goroutine 不会阻塞,运行后退出,并不会调度到squares goroutine。
让我们在发送一个额外的值:
如我们前面讨论, 通道 c <- 4 发送操作超出了channel的容量,阻塞了main goroutine, squares goroutine 获得控制权,并读取通道内的所有值。
一个通道的长度和容量怎么计算呢
与切片类似,缓冲通道具有长度和容量。通道的长度是通道缓冲区中值的数量,而通道的容量是缓冲区大小,创建时n的值。计算长度,我们使用len函数,而找出容量,我们使用cap函数,就像切片一样
如果你想知道为什么上面的程序运行良好并且没有抛出死锁错误。这是因为,由于通道容量为 3 并且缓冲区中只有 2 个值可用,Go 没有尝试通过阻止主 goroutine 执行来调度另一个 goroutine。如果需要,您可以简单地在main goroutine 中读取这些值,因为即使缓冲区未满,也不会阻止您从通道读取值。
另外一个例子:
使用多个 goroutine
下面我们创建2个goroutines, 一个计算整数的平方, 一个计算整数的立方
下面分析一下程序的执行过程:
- 首先创建了两个函数,
square
和cube
, 两个函数都使用c chan int
channel 作为参数, 函数从c
中读取整数, 计算完成后,写回c
中 - 在main goroutinue 中,我们创建了两个int 类型的 channel :
squareChan
和cubeChan
- 使用
go
关键字, 以goroutine的方式 square 和 cube - 此时控制权还在 main goroutine中, 我们个变量
testNum
一个值3 - 此时我们把
testNum
发送到channelsquareChan
和cubeChan
, main goroutine 将被阻塞,直到这些channel的数据被读取。一旦chanel中的数据被读取,main goroutine 将继续执行。 - 此时在main goroutine中, 尝试从
squareChan
和cubeChan
读取数据, 这依然是阻塞操作, 直到这些channel在他们各自的goroutine被写入数据, main gorounine 才能继续执行
上图中程序的执行结果如下:
[main] main() started
[main] sent testNum to squareChan
[square] reading
[main] resuming
[main] sent testNum to cubeChan
[cube] reading
[main] resuming
[main] reading from channels
[main] sum of square and cube of 3 is 36
[main] main() stopped
单向通道
至此, 我们所操作的channel都是双向的, 既能读取,也能写入。我们也可以创建单向操作的channel, 只读channel:只能读取数据;只写chanel:只能写入数据。
单向chanel创建依然使用make
函数,只是额外添加了单向箭头(<-
)语法:
roc := make(<-chan int) // read-only chan
soc := make(chan<- int) // send-only chan
在上面的程序中, roc是只读channel, make函数中的箭头方向是远离chan(<-chan
); soc是只写channel, make函数中箭头方向,指向chan(chan<-
), 他们是两个不同的类型
但是单向信道有什么用呢?使用单向通道增加了程序的类型安全性。可减少程序出错概率。
假如有如下场景: 假如你有一个goroutine, 你只需要在其中读取channel中的数据, 但是main goroutine需要在同一个channle读取和写入数据,该怎么做呢?
幸运的是go 提供了简单的语法, 把双向的channle,改为单向
如上述示例所示, 我们只需要在 greet 函数中, 将接收参数修改为单向channel即可,现在我们在greet中,对channel的操作,只能读了, 任何写造作都会导致 fatal 错误 "invalid operation: roc <- "some text" (send to receive-only type <-chan string)"。
匿名goroutine
在 goroutines 章节中,我们学习了匿名 goroutines。我们也可以与他们一起实施渠道。让我们修改前面的简单示例,在匿名 goroutine 中实现 channel。
这是我们之前的例子
下面是一个修改后的例子,我们将 greet goroutine 变成了一个匿名 goroutine。
channel 作为 channel 的数据类型
如标题所示, channel 作为 golang中类型中国的一等公民, 可以像其他值一样在任何地方使用: 作为结构的元素, 函数参数,返回值,甚至是另外一个通道的类型。下面的例子中,我们使用一个通道作为另外一个通道的数据类型。
select
select 就像没有任何输入参数的 switch 一样,但它只用于通道操作。 select 语句用于仅对多个通道中的一个执行操作,由 case 块有条件地选择。
我们先看一个例子:
从上面的程序中,可以看到select语句就像switch一样,但不是布尔操作,而是通道操作。 select 语句是阻塞的,除非它有default项。 一旦满足条件之一, 它将解除阻塞。 那么它什么时候满足条件呢?
如果所有 case 语句(通道操作)都被阻塞,则 select 语句将等待,直到其中一个 case 语句(其通道操作)解除阻塞,然后执行该 case。如果部分或全部通道操作是非阻塞的,则将随机选择非阻塞情况之一并立即执行。
为了解释上面的程序,我们启动了 2 个具有独立通道的 goroutine。然后启动了 2 个case的 select 语句。一种情况从 chan1 读取值,另一种情况从 chan2 读取值。由于这些通道是无缓冲的,读操作将被阻塞(写操作也是如此)。所以这两种选择的情况都是阻塞的。因此 select 将等待,直到其中一种情况变为非阻塞。
当程序运行到 select
代码段时, main goroutine 会阻塞, 然后它将调度 select 语句中存在的所有 goroutine, 每次一个,这个例子里面是service1
和 service2
对应的goroutine,service1
将会等待3s,然后, 写入一条数据到 chan1 解除阻塞, service2
等待5s,写入一条数据到chan2, 然后解除阻塞。由于 service1 比 service2 更早解除阻塞,case 1 将首先解除阻塞,因此将执行该 case,而其他 case(此处为 case 2)将被忽略。完成案例执行后,主函数的执行将继续进行。
上面的程序模拟了真实世界的 Web 服务,其中负载均衡器收到数百万个请求,并且必须从可用服务之一返回响应。使用 goroutines、channels 和 select,我们可以向多个服务请求响应,并且可以使用快速响应的服务。
为了模拟所有情况何时都阻塞并且响应几乎同时可用,我们可以简单地删除 Sleep 调用。
上述程序产生以下结果(您可能会得到不同的结果):
main() started 0s
service2() started 481µs
Response from service 2 Hello from service 2 981.1µs
main() stopped 981.1µs
有时候也可能是:
main() started 0s
service1() started 484.8µs
Response from service 1 Hello from service 1 984µs
main() stopped 984µs
发生这种情况是因为 chan1 和 chan2 操作几乎同时发生,但执行和调度仍然存在一些时间差。
default case
和 switch 语句一样,select 语句也有 default 项。 default 是非阻塞的:default case 使得 select 语句总是非阻塞的。这意味着,任何通道(缓冲或非缓冲)上的发送和接收操作始终是非阻塞的。
如果某个值在任何通道上可用,则 select 将执行该情况。如果没有,它将立即执行默认情况。
在上面的程序中,由于通道是无缓冲的,并且两个通道操作的值都不是立即可用的,因此将执行默认情况。如果上面的 select 语句没有 default case,select 就会阻塞并且响应会有所不同。
由于有default的情况下select 是非阻塞的,main goroutine 不会阻塞,调度程序也不会调用其他 goroutine, service1 和 service2 并不会执行。但是我们可以手动调用 time.Sleep 来阻塞 main goroutine。这样,所有其他的 goroutine 都会执行并死亡,将控制权返回给 main goroutine,它会在一段时间后唤醒。当main gorountine唤醒时,通道将立即有可用的值。
上述程序的执行结果:
main() started 0s
service1() started 0s
service2() started 0s
Response from service 1 Hello from service 1 3.0001805s
main() stopped 3.0001805s
也可能是:
main() started 0s
service1() started 0s
service2() started 0s
Response from service 2 Hello from service 2 3.0000957s
main() stopped 3.0000957s
死锁
当没有通道可用于发送或接收数据时,default case
很有用。为了避免死锁,我们可以使用 default case。这是可能的,因为默认情况下的所有通道操作都是非阻塞的,如果数据不是立即可用,Go 不会安排任何其他 goroutine 将数据发送到通道。
与接收类似,在发送操作中,如果其他 goroutine 处于休眠状态(未准备好接收值),则执行 default case。
nil channel
众所周知,通道的默认值为 nil。因此我们不能在 nil 通道上执行发送或接收操作。一旦在 select 语句中使用 nil 通道时,它会抛出以下错误之一或两个错误。
从上面的结果我们可以看出,select(no cases)意味着select语句实际上是空的,因为忽略了带有nil channel的cases。但是由于空的 select{} 语句阻塞了主 goroutine 并且 service goroutine 被安排在它的位置,nil 通道上的通道操作会抛出 chan send (nil chan) 错误。为了避免这种情况,我们default情况。
上面的程序不仅忽略了 case 块,而且立即执行了 default 语句。因此调度程序没有时间来调度 service goroutine。但这真是糟糕的设计。应该始终检查通道的 nil 值。
添加超时
上面的程序不是很有用,因为只执行default case。但有时,我们想要的是任何可用的服务都应该在理想的时间内做出响应,如果没有,则应该执行 default case。这可以通过使用在定义的时间后解除阻塞的通道操作的情况来完成。此通道操作由时间包的 After 函数提供。让我们看一个例子。
上面的程序,在 2 秒后产生以下结果。
main() started 0s
No response received 2.0010958s
main() stopped 2.0010958s
在上面的程序中, <-time.After(2 * time.Second)
2s 后解除阻塞,并返回时间,但在这里,我们对其返回值不感兴趣。由于它也像一个 goroutine,我们有 3 个 goroutine, time.After 是第一个解除阻塞的channel。因此,对应于该 goroutine 操作的 case 被执行。
这很有用,因为您不想等待来自可用服务的响应太长时间,因为用户必须等待很长时间才能从服务中获取任何信息。如果我们在上面的例子中添加 10 * time.Second,来自 service1 的响应将被打印出来,我想现在很明显了。
empty select
与 for{} 空循环一样,空的 select{} 语法也是有效的,但有一个问题。{}正如我们所知,select 语句会被阻塞,直到其中一个 case 解除阻塞,并且由于没有可用的 case 语句来解除阻塞,主 goroutine 将永远阻塞,从而导致死锁。
在上面的程序中,我们知道 select 会阻塞 main goroutine,调度器会调度另一个可用的 goroutine,即 service。但在那之后,它会死掉,调度必须调度另一个可用的 goroutine,但由于主例程被阻塞,没有其他 goroutine 可用,导致死锁。
main() started
Hello from service!
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [select (no cases)]:
main.main()
program.Go:16 +0xba
exit status 2
waitgroup
让我们想象一种情况,您需要知道所有 goroutine 是否都完成了它们的工作。这与 select 你只需要一个条件为真的情况有点相反,但在这里你需要所有条件都为真才能解除main goroutine 的阻塞。这里的条件是通道操作成功。
WaitGroup 是一个带有计数器值的结构体,它跟踪产生了多少 goroutine 以及有多少已经完成了它们的工作。当这个计数器达到零时,意味着所有的 goroutine 都完成了他们的工作。
让我们深入研究一个示例:
在上面的程序中,创建了一个类型为 sync.WaitGroup
的空结构(带有零值字段)wg。 WaitGroup 结构体有禁止导出的字段,例如 noCopy、state1 和 sema,我们不需要知道它们的内部实现。这个结构有三个方法,即 Add
, Wait
和 Done
。
Add
方法需要一个 int 参数,它是 WaitGroup 计数器的增加量。计数器只不过是一个默认值为 0 的整数。它保存了正在运行的 goroutine 的数量。当 WaitGroup 创建时,它的计数器值为 0,我们可以通过使用 Add 方法传递 delta 作为参数来增加它。请记住,当 goroutine 启动时,计数器不会递增,因此我们需要手动递增它。
Wait
方法用于阻塞当前的gorountine。一旦计数器达到 0,该 goroutine 将解除阻塞。因此,我们需要一些东西来减少计数器。
Done
方法就是用来递减计数器的。它不接受任何参数,因此它只将计数器减 1。
在上面的程序中,创建 wg 后,我们运行了 3 次 for 循环。在每一轮中,我们启动了 1 个 goroutine 并将计数器加 1。这意味着,现在我们有 3 个 goroutine 等待执行,WaitGroup 计数器为 3。请注意,我们在 goroutine 中传递了一个指向 wg 的指针。这是因为在 goroutine 中,一旦我们完成了 goroutine 应该做的任何事情,我们需要调用 Done 方法来递减计数器。如果 wg 作为值传递,则 main 中的 wg 不会递减。这是很明显的。
在 for 循环执行完毕后,我们仍然没有将控制权交给其他 goroutine。这是通过调用wg 的 Wait方法来完成的。这将阻塞 main goroutine,直到计数器达到 0。一旦计数器达到 0,因为从 3 个协程开始,我们在 wg 上调用了 Done 方法 3 次,主协程将解除阻塞并开始执行进一步的代码。
上面的程序将产生如下结果:
main() started
Service called on instance 2
Service called on instance 3
Service called on instance 1
main() stopped
以上结果对你们来说可能不同,因为 goroutine 的执行顺序可能会有所不同。
添加方法接受 int 类型,这意味着 delta 也可以是负数。要了解更多信息,请查看官方文档[https://golang.org/pkg/sync/#WaitGroup.Add]。
worker pool
顾名思义,worker pool
:即工作池,是并发模式协同完成同样工作一些列 goroutine 的集合。在前面 WaitGroup 中,我们看到一组 goroutines 并发工作,但它们没有特定的工作内容。一旦将channel加入其中,让它们有相同的工作去完成,这些goroutines就会成为一个工作池。
因此,worker pool
背后的概念是维护一个 worker goroutines 池,它接收一些任务并返回结果。一旦他们都完成了他们的工作,我们就会收集结果。所有这些 goroutine 都出于各自的目的使用相同的通道。
让我们看一个带有两个通道的简单示例,即 tasks 和 results
这个程序发生了什么呢?
-
sqrWorker
是一个工作函数, 它接收三个参数tasks
channel,results
channel 和id
, 这个gorountine的任务是接收tasks
中的数据,计算平方,并把结果发送到results
中。
- 在 main 函数中, 创建了缓冲容量为10的
tasks
和results
channel,因此,在tasks
缓冲满之前, 发送操作都是非阻塞的。因此,设置大的缓冲值是一个是一个好主意。
- 然后我们生成多个
sqrWorker
goroutines实例,把前面建立的两个channel 和 id 作为参数,其中id是作为标记,标识是哪个gorountine正在执行任务。
- 然后我们将 5 个job传递给非阻塞的
tasks
channel。
- 完成了发送job到
tasks
后,我们关闭了它。这并不是必需的,但是如果出现一些错误,它将在将来节省大量时间。
- 然后使用 for, 循环 5 次,我们从
results
channel中提取数据。由于对空缓冲区的读取操作是阻塞的,因此将从工作池中调度一个 goroutine。在 goroutine 返回一些结果之前,main goroutine 将被阻塞。
- 因为在 worker goroutine 中模拟阻塞操作,调度程序将调用另一个可用的 goroutine,直到work gorountine变得可用时,它会将计算结果写入
results
channel。由于在缓冲区满之前,写入通道是非阻塞的,因此在此处写入results
通道是非阻塞的。此外,虽然当前的 worker goroutine 不可用,但执行了多个其他 worker goroutines,消耗了tasks
缓冲区中的值。在所有worker goroutine 消耗完tasks
后,tasks
通道缓冲区为空,for range 循环结束。当tasks
通道关闭时,它不会抛出死锁错误。
- 有时,所有工作协程都可能处于休眠状态,因此主协程将唤醒并工作,直到结果通道缓冲区再次为空。
- 在所有工作 goroutine 死后,main goroutine 将重新获得控制权并从结果通道打印剩余的结果并继续执行。
上面的例子解释了多个 goroutines 如何可以在同一个通道上提供数据并优雅地完成工作。当worker被阻塞时,goroutines 很灵活的解决了这个问题。如果删除 time.Sleep() 的调用,则只有一个 goroutine 独自完成该作业,因为在 for range 循环完成且 goroutine 终止之前不会调度其他 goroutine。
运行系统速度不同, 您可能得到与上述例子不同的结果,因为如果所有的gorountine即使是被阻塞很短的时间, main gorountine也会被唤醒执行程序。
现在,让我们使用sync.WaitGroup 实现相同的效果,但更优雅。
上面的结果看起来很整洁,因为main goroutine 中results
channel 上的读取操作是非阻塞的,而results
channel 开始读取前已经完成结果填充,而main goroutine 被 wg.Wait() 调用阻塞。使用 waitGroup,我们可以防止大量(不必要的)上下文切换(调度),这里是 7,而前面的例子是 9。但是有一个牺牲,因为你必须等到所有的工作都完成。
metux
Mutex【锁】 是 Go 中最简单的概念之一。但在解释之前,让我们先了解什么是竞态条件。 goroutines 有它们独立的堆栈,因此它们之间不共享任何数据。但是可能存在堆中的某些数据在多个 goroutine 之间共享的情况。在这种情况下,多个 goroutine 试图在同一内存位置操作数据,从而导致意外结果。下面展示一个简单的例子:
在上面的程序中,我们生成了 1000 个 goroutines,它们增加了初始为 0 的全局变量 i 的值。由于我们正在实现 WaitGroup,我们希望所有 1000 个 goroutines 一一增加 i 的值,从而得到 i 的最终值为 1000。当主 goroutine 在 wg.Wait() 调用后再次开始执行时,我们正在打印 i。让我们看看最终的结果。
value of i after 1000 operations is 937
什么?为什么我们不到1000?看起来有些 goroutines 不起作用。但实际上,我们的程序存在竞争条件。让我们看看可能发生了什么。
i = i + 1 calculation has 3 steps
-(1) 获取 i 当前值
-(2) 将i的值增加 1
-(3) 使用新值替换 i
让我们想象一个场景,在这些步骤之间安排了不同的 goroutine。例如,让我们考虑 1000 个 goroutine 池中的 2 个 goroutine,即:G1 和 G2。
当 i 为 0 时 G1 首先启动,运行前 2 个步骤,现在 i 现在为 1。但在 G1 更新步骤 3 中 i 的值之前,新的 goroutine G2 被调度并运行所有步骤。但是在 G2 的情况下,i 的值仍然是 0,因此在它执行第 3 步之后,i 将是 1。现在 G1 再次被安排完成第 3 步并从第 2 步更新 i 的值为 1。在 goroutines 的完美世界中在完成所有 3 个步骤后安排,2 个 goroutines 的成功操作会产生 i 的值是 2 但这里不是这种情况。因此,我们几乎可以推测为什么我们的程序没有将 i 的值变为 1000。
到目前为止,我们了解到 goroutine 是协作调度的。除非一个 goroutine 在并发课程中提到的条件之一阻塞,否则另一个 goroutine 不会取代它。既然 i = i + 1 没有阻塞,为什么 Go 调度器会调度另一个 goroutine?
你绝对应该在stackoverflow上查看这个答案。在任何情况下,您都不应该依赖 Go 的调度算法并实现自己的逻辑来同步不同的 goroutine。
确保一次只有一个 goroutine 完成上述所有 3 个步骤的一种方法是实现互斥锁。 Mutex(互斥)是编程中的一个概念,其中一次只有一个例程(线程)可以执行多个操作。这是通过一个例程获取对值的锁定,对它必须执行的值进行任何操作,然后释放锁定来完成的。当值被锁定时,没有其他例程可以读取或写入它。
在 Go 中,互斥锁数据结构(本质上是个map)由sync包提供的。在 Go 中,在对可能导致竞争条件的值执行任何操作之前,我们使用 mutex.Lock() 方法获取锁,然后是操作代码。一旦我们完成了操作,在上面的程序 i = i + 1 中,我们使用 mutext.Unlock() 方法解锁它。当任何其他 goroutine 在锁存在时尝试读取或写入 i 的值时,该 goroutine 将阻塞,直到操作从第一个 goroutine 解锁。因此只有 1 个 goroutine 可以读取或写入 i 的值,避免竞争条件。请记住,在整个操作被解锁之前,锁定和解锁之间的操作中存在的任何变量将不可用于其他 goroutine。
让我们用互斥锁修改前面的例子。
在上面的程序中,我们创建了一个互斥锁 m 并将指向它的指针传递给所有生成的 goroutine。在开始对 i 进行操作之前,我们使用 m.Lock() 语法获取了互斥锁 m 上的锁,并且在操作之后,我们使用 m.Unlock() 语法将其解锁。以上程序产生以下结果
value of i after 1000 operations is 1000
从上面的结果可以看出,互斥锁帮助我们解决了竞态条件。但是第一条规则是避免 goroutine 之间共享资源。
您可以在运行 Go run -race program.Go 之类的程序时使用种族标志测试 Go 中的竞争条件。在此处阅读有关比赛检测器的更多信息。
Concurrency Patterns 【并发模式】
通俗来讲,就是日常使用的常用范式。
以下是一些可以使程序更快更可靠的概念和方法。
- Generator 【生成器模式】
使用通道,可以实现更好实现生成器。
比如斐波那契数列,计算上开销很大, 我们可以提前计算好结果,并放入channel中, 等待程序执行到此,直接取结果即可, 而不必等待。
此图中,调用fib 函数,返回了一个channel,通过循环channel,可以接收到的计算好的数据。在 fib 函数内部,必须返回一个只接收通道,我们创建一个有buffer的通道,并在函数最后返回它。fib的返回值会将这个双向通道转换为单向只接收通道。在匿名 goroutine 中,使用 for 循环将斐波那契数推送到此通道,完成后,关闭此通道。在主协程中,使用 fib 函数调用的范围,我们可以直接访问这个通道。
- fan-in & fan-out 【多路复用】
fan-in 是一种多路复用策略,将过个输入通道组合,以产生输出通道。fan-out 是将单个通道拆分为多个通道的解复用策略。
package main
import (
"fmt"
"sync"
)
// return channel for input numbers
func getInputChan() <-chan int {
// make return channel
input := make(chan int, 100)
// sample numbers
numbers := []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}
// run goroutine
go func() {
for num := range numbers {
input <- num
}
// close channel once all numbers are sent to channel
close(input)
}()
return input
}
// returns a channel which returns square of numbers
func getSquareChan(input <-chan int) <-chan int {
// make return channel
output := make(chan int, 100)
// run goroutine
go func() {
// push squares until input channel closes
for num := range input {
output <- num * num
}
// close output channel once for loop finishesh
close(output)
}()
return output
}
// returns a merged channel of `outputsChan` channels
// this produce fan-in channel
// this is veriadic function
func merge(outputsChan ...<-chan int) <-chan int {
// create a WaitGroup
var wg sync.WaitGroup
// make return channel
merged := make(chan int, 100)
// increase counter to number of channels `len(outputsChan)`
// as we will spawn number of goroutines equal to number of channels received to merge
wg.Add(len(outputsChan))
// function that accept a channel (which sends square numbers)
// to push numbers to merged channel
output := func(sc <-chan int) {
// run until channel (square numbers sender) closes
for sqr := range sc {
merged <- sqr
}
// once channel (square numbers sender) closes,
// call `Done` on `WaitGroup` to decrement counter
wg.Done()
}
// run above `output` function as groutines, `n` number of times
// where n is equal to number of channels received as argument the function
// here we are using `for range` loop on `outputsChan` hence no need to manually tell `n`
for _, optChan := range outputsChan {
go output(optChan)
}
// run goroutine to close merged channel once done
go func() {
// wait until WaitGroup finishesh
wg.Wait()
close(merged)
}()
return merged
}
func main() {
// step 1: get input numbers channel
// by calling `getInputChan` function, it runs a goroutine which sends number to returned channel
chanInputNums := getInputChan()
// step 2: `fan-out` square operations to multiple goroutines
// this can be done by calling `getSquareChan` function multiple times where individual function call returns a channel which sends square of numbers provided by `chanInputNums` channel
// `getSquareChan` function runs goroutines internally where squaring operation is ran concurrently
chanOptSqr1 := getSquareChan(chanInputNums)
chanOptSqr2 := getSquareChan(chanInputNums)
// step 3: fan-in (combine) `chanOptSqr1` and `chanOptSqr2` output to merged channel
// this is achieved by calling `merge` function which takes multiple channels as arguments
// and using `WaitGroup` and multiple goroutines to receive square number, we can send square numbers
// to `merged` channel and close it
chanMergedSqr := merge(chanOptSqr1, chanOptSqr2)
// step 4: let's sum all the squares from 0 to 9 which should be about `285`
// this is done by using `for range` loop on `chanMergedSqr`
sqrSum := 0
// run until `chanMergedSqr` or merged channel closes
// that happens in `merge` function when all goroutines pushing to merged channel finishes
// check line no. 86 and 87
for num := range chanMergedSqr {
sqrSum += num
}
// step 5: print sum when above `for loop` is done executing which is after `chanMergedSqr` channel closes
fmt.Println("Sum of squares between 0-9 is", sqrSum)
}
完