参考:
https://www.flysnow.org/2017/05/12/go-in-action-go-context.html
http://c.biancheng.net/view/5714.html
https://www.liwenzhou.com/posts/Go/go_context/
关键点
通过关键词汇,实现快速理解,记忆的目的:
1、 协程
创建好后,如何结束呢?:
第一,自己业务运行
完毕后,自己结束
- sync.WatiGroup
第二,其他
地方,给协程发结束信号,协程收到结束信号后,自己结束
channel+select
context
2、
context上下文其实就是goroutine的上下文3、context就是
控制协程goroutine结束的方式,提供了4种
从这篇文章<<Go语言并发简述>>中, 我们已经知道如何创建一个协程goroutine,
那么现在会出现一个问题:协程创建好,它自己去运行了,那么协程什么结束呢?
从结束方式的视角看,大概有几种吧:
- 第一,自己业务运行完毕后,自己结束
- 第二,其他地方,给协程发结束信号,协程收到结束信号后,自己结束
- channel+select
- context
1、等待组sync.WaitGroup方式
sync.WaitGroup它是一种控制并发的方式,它的这种方式是控制多个goroutine同时完成。
func main() {
var wg sync.WaitGroup
wg.Add(2)
go func() {
time.Sleep(2*time.Second)
fmt.Println("1号完成")
wg.Done()
}()
go func() {
time.Sleep(2*time.Second)
fmt.Println("2号完成")
wg.Done()
}()
wg.Wait()
fmt.Println("好了,大家都干完了,放工")
}
一个很简单的例子,一定要例子中的2个goroutine同时做完,才算是完成,先做好的就要等着其他未完成的,所有的goroutine要都全部完成才可以。
1.1、sync.WaitGroup的使用场景?
比方说,有一个大的任务,为了提高性能,改成并发去做,需要将这个任务分割成n个独立的小任务,每个任务对应一个协程,这n个任务中,肯定有先完成的,有最后完成的,等n个任务都完成了,那么这个大的任务才算完成。
sync.WaitGroup就是保证所有的小任务都完成了,阻塞才结束,才可以继续进行别的业务
1.2、sync.WaitGroup不适合什么样的场景呢?
在实际的业务中,可能会存在一种场景:需要我们主动的发送一个特殊信号,告诉某一个goroutine结束,而不是让goroutine自己业务运行完后结束。
比方说,一个1G的大文件,通过10个协程分别传输100M, 其中某几个协程出问题了,已经结束运行了,而其他协程还在传输中,其实,这次任务已经失败了,这几个协程就没必要继续传输了,会造成内存泄露,此时,我们需要一种机制,来告诉协程,让协程主动结束。
sync.WaitGroup是不能实现这个要求的。
2、channel+select(通道+多路通道复用)
我们都知道一个goroutine启动后,我们是无法控制他的,大部分情况是等待它自己结束,那么如果这个goroutine是一个不会自己结束的后台goroutine呢?比如监控等,会一直运行的。
这种情况化,一个傻瓜式的办法是全局变量,其他地方通过修改这个变量完成结束通知,然后后台goroutine不停的检查这个变量,如果发现被通知关闭了,就自我结束。
这种方式也可以,但是首先我们要保证这个变量在多线程下的安全,基于此,有一种更好的方式:chan + select 。
2.1、例子
假设有一批数据需要进行某种处理(比如打印输出),如果数据中存在不符合要求的数据值,就说明这批数据不符合要求,结束运行
package main
import (
"fmt"
"runtime"
"time"
)
func main() {
fmt.Printf("A、goroutine num=\t%d\n", runtime.NumGoroutine())
quit := make(chan bool)
workForChan := make(chan int)
data := []int{2,3,4,0,1,3,6,8,9}
go func() {
for {
select {
case <- quit:
fmt.Println("goroutine end")
return
case n := <- workForChan:
fmt.Println("n = ", n)
}
}
}()
fmt.Printf("B、goroutine num=\t%d\n", runtime.NumGoroutine())
for _, d := range data {
if d == 0 {
quit <- true
break
}
workForChan <- d
}
fmt.Printf("C、goroutine num=\t%d\n", runtime.NumGoroutine())
time.Sleep(time.Second)
fmt.Printf("D、goroutine num=\t%d\n", runtime.NumGoroutine())
}
分析:
第10行,打印出当前的协程数量
第11,11行定义一个结束通道,一个工作通道
第14行,准备的待测试数据
-
第16-26行,开启运行一个协程,使用了select多通道复用,
一个通道用于接收是否结束协程
一个通道用于接收数据
第28行,测试当前的协程数量
-
第20-36行,遍历获取数据,对数据进行合理性校验,
没有问题的话,就通过通道发送给协程,进行业务处理
如果数据有问题的话,就给
结束通道发送信号,告诉协程,可以进行结束清理工作了
第37行,给协程发送结束通知后,查询当前的协程数量
第38行,人工休息一下
第39行,打印当前的协程数量
模拟了,在某种情况下,主动给协程发送信号,让协程主动结束运行,也就是说,控制了单个协程的结束。

2.2、chan+select方式 不适合什么场景?或者说当有很多goroutine都需要控制结束怎么办呢?
这种chan+select的方式,是比较优雅的结束一个goroutine的方式,不过这种方式也有局限性,
如果有很多goroutine都需要控制结束怎么办呢?
如果这些goroutine又衍生了其他更多的goroutine怎么办呢?
如果一层层的无穷尽的goroutine呢?
这就非常复杂了,即使我们定义很多chan也很难优雅的解决这个问题,
因为goroutine的关系链就导致了这种场景非常复杂。
3、初识context.Context
上面说的这种场景是存在的,比如一个网络请求Request,每个Request都需要开启一个goroutine做一些事情,这些goroutine又可能会开启其他的goroutine。
所以我们需要一种可以跟踪goroutine的方案,才可以达到控制他们的目的,这就是Go语言为我们提供的Context,称之为上下文非常贴切,它就是goroutine的上下文。
因此,从业务逻辑角度看,会存在一组业务上有关系的goroutine, 为每一组都创建一个上下文,来管理这一组goroutine的结束
3.1、例子
模拟上面的情况,context控制多个协程的结束
不用关心代码实现的业务。
这几个协程主要是打印输出
然后模拟,主协程里运行了2秒钟的业务,然后由于某种原因,需要给子协程发送信号,结束子协程的运行。
其中,为了测试全面,go4协程不受context的控制。
package main
import (
"fmt"
"runtime"
"time"
"context"
)
//每隔一定时间,打印一次
func go1(ctx context.Context) {
for {
select {
case <-ctx.Done():
fmt.Println("goroutine 1 end")
return
case <-time.NewTicker(time.Millisecond * 1000).C:
fmt.Println("goroutine 1")
}
}
}
func go2(ctx context.Context) {
for {
select {
case <-ctx.Done():
fmt.Println("goroutine 2 end")
return
case <-time.NewTicker(time.Millisecond * 2000).C:
fmt.Println("goroutine 2")
}
}
}
// 启动2个协程
func go3(ctx context.Context) {
go go3A(ctx)
go go3B(ctx)
}
func go3A(ctx context.Context) {
for {
select {
case <-ctx.Done():
fmt.Println("goroutine 3a end")
return
case <-time.NewTicker(time.Millisecond * 2000).C:
fmt.Println("goroutine 3a")
}
}
}
func go3B(ctx context.Context) {
for {
select {
case <-ctx.Done():
fmt.Println("goroutine 3b end")
return
case <-time.NewTicker(time.Millisecond * 1000).C:
fmt.Println("goroutine 3b")
}
}
}
func go4() {
for {
fmt.Println("other goroutine")
time.Sleep(time.Second * 10)
}
}
//main,也是一个协程方式运行,为主协程,其他都是子协程
func main() {
//查看运行前,协程数量
fmt.Printf("A、goroutine num=\t%d\n", runtime.NumGoroutine())
ctx, cancel := context.WithCancel(context.Background())
//go1,go2,go3 受context管理
go go1(ctx)
go go2(ctx)
// go3这个协程,创建2个子线程,会主动关闭
go go3(ctx)
go go4()
// 休息是为了,等待前面的协程创建好
time.Sleep(time.Millisecond * 500)
fmt.Printf("B、goroutine num=\t%d\n", runtime.NumGoroutine())
// 这里是模拟业务,会在这里阻塞2秒钟,主main协程中运行了2秒钟的业务,然后让context控制的协程退出
select {
case <-time.After(time.Second * 2):
fmt.Println("开始触发结束信号")
cancel()
}
// 休息一下,context管理的协程全部关闭
time.Sleep(time.Millisecond * 500)
// 查看当前有多少个协程
fmt.Printf("C、goroutine num=\t%d\n", runtime.NumGoroutine())
}
上面例子中,协程之间的关系是:

context.Background() 返回一个空的Context,这个空的Context一般用于整个Context树的根节点。然后我们使用context.WithCancel(parent)函数,创建一个可取消的子Context,然后当作参数传给goroutine使用,这样就可以使用这个子Context跟踪这个goroutine。
在goroutine中,使用select调用<-ctx.Done(),
ctx.Done()返回一个通道,此通道处于阻塞状态,直到收到context发送的信号
3.2、那么是如何发送结束指令的呢?
这就是示例中的cancel函数啦,它是我们调用context.WithCancel(parent)函数生成子Context的时候返回的,第二个返回值就是这个取消函数,它是CancelFunc类型的。
我们调用它cancel()就可以发出取消指令,然后协程中的ctx.Done()通道就会收到信号,然后就会执行case里的语句了。

3.3、注意一点
并不是context让协程关闭的,context只是发送信号,协程收到信号后自己决定要不要关闭

3.4 稍微修改下代码,模拟一下上面的情况:
go3b函数,收到context发送的信号后,不再退出
将58行注释掉,这样协程不会退出了

协程收到context发送的信号后,
ctx.Done()
通道不再阻塞、执行case分支、
-
协程
自己决定要做什么业务(这里不在return了)
协程收到context发送的信号,ctx.Done不在阻塞、执行case分支、自己决定是否要关闭
最后显示,还剩3个协程 ,
说明go3b协程,收到context发送信号后,协程并没有关闭
