Goroutines
- 模型:和其他goroutine在共享的地址空间中并发执行的函数
- 资源消耗: 初始时非常小的栈开销,之后随着需求在堆上增减内存
- 创建和销毁: go 关键字表示创建一个新的goroutine(注意不会马上执行,而是放在调度的队列中等待调度), 函数运行结束后,goroutine自动销毁
goroutine才是golang的优势之处,简单,轻量的并发模型。
Channel
- 数据类型的一种,类似消息队列,便于不同goroutine间通信。
- 可单可双通道,可以包含各种类型的数据;也可以分带buffer和不带buffer的
- 从空的channel中读取数据会阻塞(关闭的管道不会阻塞),同样往满的channel中写数据也会阻塞
- channel不像文件、网络套接字那样,close不会释放资源,只是不再接收更多消息,因而不需要通过close来释放channel资源;但是如果有range loop的,需要close掉,要不range loop会block住
- 如果往关闭的channel中写入数据,则会panic;如果是读数据的,先读取管道中多余的数据,之后都会取得零值
- 如果事先不知道有多少个channel,可以用reflect.Select来选择
Sync package
- 有读写锁、写锁,atomic,waitgroup
There’s a disconnect between the concurrency primitives that Go, and the expectations of those who try it.
golang提供了非常简便的并发模型,但并发编程仍然不容易。
正文
先从'go'关键字开始。
package main
import (
"fmt"
"log"
"net/http"
)
func main() {
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintln(w, "Hello, GopherCon SG")
})
go func() {
if err := http.ListenAndServe(":8080", nil); err != nil {
log.Fatal(err)
}
}()
for {}
}
hello world web2.0 版本
上面的代码有什么问题?
for {}
for{}
是个死循环,会一直占用cpu,导致cpu空转。
怎么解决呢?
for {
runtime.Gosched()
}
让出CPU,但这种做法还是会占用cpu,没有解决根本问题。有更好点的办法,用select{}
替代for{}
,空select{}
语句会一直阻塞。
上面的示例仅仅为了演示一些小问题,不会正式地使用,下面这种写法,才是我们经常使用的正确示例:
package main
import (
"fmt"
"log"
"net/http"
)
func main() {
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintln(w, "Hello, GopherCon SG")
})
if err := http.ListenAndServe(":8080", nil); err != nil {
log.Fatal(err)
}
}
If you have to wait for the result of an operation, it’s easier to do it yourself.
- 第一个建议:如果需要等待某个操作的结果,不需要再新建goroutine运行这个操作,同时阻塞外层goroutine
既然最大的特点是并发,当然不能错过并发的示例了:
func restore(repos []string) error {
errChan := make(chan error, 1)
sem := make(chan int, 4) // four jobs at once
var wg sync.WaitGroup
wg.Add(len(repos))
for _, repo := range repos {
sem <- 1
go func() {
defer func() {
wg.Done()
<-sem
}()
if err := fetch(repo); err != nil {
errChan <- err
}
}()
}
wg.Wait()
close(sem)
close(errChan)
return <-errChan
}
这个是gb-vendor早期的一个版本,并发的获取依赖资源
仔细观察下,觉得代码怎么样,能只出哪些问题?
首先来看下这一对代码块:
defer func() {
wg.Done()
<-sem
}()
和这段:
wg.Wait()
close(sem)
close(sem)在wg.Wait()之后,wg.Wait()在wg.Done()之后,但是并不能保证在<-sem之后发生,也就是说close(sem)和<-sem谁先谁后是没有保证的。那么有可能导致panic么?
参考最上面关于channel的介绍:从关闭了的channel中读取数据,(如果有)先取出管道中的数据,之后会直接返回零值,不会阻塞。
简单的修改下defer,可以让执行顺序变得清晰:
func restore(repos []string) error {
errChan := make(chan error, 1)
sem := make(chan int, 4) // four jobs at once
var wg sync.WaitGroup
wg.Add(len(repos))
for _, repo := range repos {
sem <- 1
go func() {
defer wg.Done()
if err := fetch(repo); err != nil {
errChan <- err
}
<-sem
}()
}
wg.Wait()
close(sem)
close(errChan)
return <-errChan
}
- 第二建议:锁和信号量的释放顺序与他们获取的顺序相反。
有点类型多层锁,内层的锁先释放,而后才是外层。
Why close(sem)?
channel不像文件、网络套接字那样,close不会释放资源,只是不再接收更多消息,因而不需要通过close来释放channel资源;但是如果有range loop的,需要close掉,要不range loop会block住.
这里没有channel的range loop,因而可以删除close(sem)
再来看看sem是如何使用的
sem是为了在任何时候,仅有有限的fetch操作在运行。仔细观察下前面的代码,有什么疑问么?
代码仅仅保证了不超过4个goroutine在运行,而不是4个fetch操作正在运行,再体会下两者的却别。
前面的代码只保证不超过4个goroutine再运行,当第五repo时,会阻塞for循环,等待之前某个goroutine执行完了之后,再新建一个goroutine(不会马上执行),相对来说效率低下。
还有一种是将所需要的goroutine放入调度池,然后直接运行:
func restore(repos []string) error {
errChan := make(chan error, 1)
sem := make(chan int, 4) // four jobs at once
var wg sync.WaitGroup
wg.Add(len(repos))
for _, repo := range repos {
go func() {
defer wg.Done()
sem <- 1
if err := fetch(repo); err != nil {
errChan <- err
}
<-sem
}()
}
wg.Wait()
close(errChan)
return <-errChan
}
将 sem <- 1放入go func里面,所有的goroutine都会创建好,并马上执行.
- 建议三:对于信号量来说,在哪里用就在哪里获取
bug都搞定了?
回到上面的代码,注意 for .. range 和fetch(repo) 代码块,看出什么问题了么?
有两个问题:
1.goroutine中的变量repo会随着每次迭代而改变,可能导致所有的fetch操作都是抓取最后一次的值
2.如果对变量repo同时有读写操作的话,会引起竞争
怎么处理呢?给匿名方法添加参数:
func restore(repos []string) error {
errChan := make(chan error, 1)
sem := make(chan int, 4) // four jobs at once
var wg sync.WaitGroup
wg.Add(len(repos))
for i := range repos {
go func(repo string ) {
defer wg.Done()
sem <- 1
if err := fetch(repo); err != nil {
errChan <- err
}
<-sem
}(repos[i])
}
wg.Wait()
close(errChan)
return <-errChan
}
- 建议4:避免在goroutine中直接使用外部变量,最好以参数的方式传递
最后一个了吧?
wait, one more bug
回到上面代码,仔细观察errChan和fetch error的处理,估计打死都看不出问题吧?
给点小提示,如果超过一个error,会出现什么情况?
close(errChan)依赖于wg.Wait()先执行,wg.Wait()依赖于wg.Done()先执行,wg.Done又依赖于errChan <-err先执行,但errChan的buffer只有1,goroutine却有四个。但超过一个error时,boom...,errChan <- err 操作阻塞了,形成死锁。
解决办法,errChan的buffer等于repos的个数: errChan := make(chan error, len(repos))
- 最后一条建议:当你创建goroutine时,需要知道什么时候,怎么样退出这个goroutine
golang提供的并发模型很简单,但是用好并发还需要掌握各种常见模式和场景,而不仅仅是语言方面的知识
个人博客: blog.duomila.club
参考资料:
https://dave.cheney.net/paste/concurrency-made-easy.pdf