目录
自问自答
- errgroup(以下简称eg)如何控制并发任务数量
- eg如何获取错误信息,获取的是哪一个任务的错误信息
- eg如何通知其他任务取消
- 对比go-zero的servergroup,区别在哪里
1.errgroup介绍
以下摘自仓库描述
Package errgroup provides synchronization, error propagation, and Context cancelation for groups of goroutines working on subtasks of a common task.
errgroup.Group is related to sync.WaitGroup but adds handling of tasks returning errors.
即:
- 实际上是对sync.waitgroup的封装
- 实现子协程分组
- 提供感知组内错误的功能
我们接下来通过源码阅读来知道他是如何实现上面的功能的
2.源码走读
errgroup对外暴露只有5个方法2.1主要数据结构-Group
type Group struct {
cancel func(error)
wg sync.WaitGroup
sem chan token
errOnce sync.Once
err error
}
- cancel,是WithContext函数生成的上下文,cancelCtx的取消函数
- errOnce,用于保证err只被赋予一次
- err ,保存启动协程时返回的第一个错误
- sem ,基于channel实现的信号量,用于限制协程的并发执行数量,这个也是最近的新增功能,类型token实际上就是空结构体struct{}
同时意味着我们不能像waitgroup一样通过简单的var去声明(waitgroup结构体只有几个数字成员),否则cancel==nil && sem==nil,只能通过对外暴露的函数来声明
2.2 WithContext和SetLimit
返回一个可以被取消的上下文
func WithContext(ctx context.Context) (*Group, context.Context) {
ctx, cancel := withCancelCause(ctx)
return &Group{cancel: cancel}, ctx
}
func withCancelCause(parent context.Context) (context.Context, func(error)) {
return context.WithCancelCause(parent)
}
实际上就是对context.WithCancelCause的封装,Group.cancel对标的就是context.CancelCauseFunc
type CancelCauseFunc func(cause error)
那么发现了一个问题,这里的Group.sem还是没被初始化==nil,实际上是在调用setlimit时才被初始化
func (g *Group) SetLimit(n int) {
if n < 0 {
g.sem = nil
return
}
if len(g.sem) != 0 {
panic(fmt.Errorf("errgroup: modify limit while %v goroutines in the group are still active", len(g.sem)))
}
g.sem = make(chan token, n)
}
2.3最关键的GO函数和trygo
func (g *Group) Go(f func() error) {
if g.sem != nil {
g.sem <- token{}
}
当通道满的时候,即并发数量等于我们所限制的数量时就会被阻塞,从而限制并发数量
这也是与TryGO唯一不同的地方
func (g *Group) TryGo(f func() error) bool {
if g.sem != nil {
select {
case g.sem <- token{}:
// Note: this allows barging iff channels in general allow barging.
default:
return false
}
}
trygo会直接快速失败
go剩余部分代码:
g.wg.Add(1)
go func() {
defer g.done()
if err := f(); err != nil {
g.errOnce.Do(func() {
g.err = err
if g.cancel != nil {
g.cancel(g.err)
}
})
}
}()
- 使用add增加一个任务数量
- 启动一个协程执行传入函数f
- 当函数执行返回错误时,使用sync.once对err赋值(使用锁会覆盖第一次的错误信息),然后执行cancel函数
- 最后执行done函数,退出主协程的阻塞(前提是调用了wait方法进行阻塞)
func (g *Group) done() {
if g.sem != nil {
<-g.sem
}
g.wg.Done()
}
我们回顾一下前文学习 context.Withcancel函数生成的cancel会干那些事情:
- 往通道发送信号
- 调用生成子context的cancel方法
- 删除ctx的map中存放的子ctx
说白了最主要的就是往ctx的通道发送东西,然后我们可以通过ctx.Done方法获取
也就是说errgroup包的组内发生错误时,并不能取消其他子任务,而只是报告错误,我们需要主动监听ctx.Done
2.4Wait
就是waitgroup.Wait的封装
3.实验--管理多个服务的生命周期
以下都是伪代码,需要自行完善
我们假设有以下服务接口
type Server interface {
Run() error
Stop() error
Name() string
}
我们需要在一个进程内对多个端口的服务进行管理,典型的例子如同时提供http API服务和rpc服务,需要确保一个失败就全失败,假设名字就叫MainAPP
type MainApp struct {
//接收中断内部信号
sigs []os.Signal
//代理服务
servers []Server
ctx context.Context
cancle context.CancelFunc
}
利用errgroup就能很好的实现该功能
func (a *MainApp) Run() error {
c := make(chan os.Signal, 1)
signal.Notify(c, a.sigs...)
g, ctx := errgroup.WithContext(a.ctx)
}
for _, srv := range a.servers {
srv := srv
g.Go(func() error {
<-ctx.Done()
return srv.Stop()
})
g.Go(func() error {
return srv.Run()
})
}
g.Go(func() error {
for {
select {
case <-ctx.Done():
return nil
case <-c:
a.cancle()
}
}
})
if err := g.Wait(); err != nil && !errors.Is(err, context.Canceled) {
return err
}
return nil
需要注意g.wait只捕获第一个服务运行异常的错误,因此select中返回错误也不会被捕获,没意义;但是还有一个清空是我们主动执行ctrl+c中断,select可能会第一个返回,但是也没意义所以是nil
4.增强阅读
4.1kratos框架的启动部分
github代码地址点击直达
阅读时可暂时忽略registrar部分,这是服务注册部分,内部是使用errgroup实现的
4.2 go-zero框架使用ServiceGroup管理多个服务启动和停止
使用方法文章阅读
源码直达
他实际上是也是封装了sync.once和waitgroup(syncx.once也是套娃)
ServiceGroup struct {
services []Service
stopOnce func()
}
func NewServiceGroup() *ServiceGroup {
sg := new(ServiceGroup)
sg.stopOnce = syncx.Once(sg.doStop)
return sg
}
func (sg *ServiceGroup) doStop() {
for _, service := range sg.services {
service.Stop()
}
}
func (sg *ServiceGroup) Add(service Service) {
// push front, stop with reverse order.
sg.services = append([]Service{service}, sg.services...)
}
主要方法doStart
func (sg *ServiceGroup) doStart() {
routineGroup := threading.NewRoutineGroup()
for i := range sg.services {
service := sg.services[i]
routineGroup.Run(func() {
service.Start()
})
}
routineGroup.Wait()
}
type RoutineGroup struct {
waitGroup sync.WaitGroup
}
func NewRoutineGroup() *RoutineGroup {
return new(RoutineGroup)
}
func (g *RoutineGroup) Run(fn func()) {
g.waitGroup.Add(1)
go func() {
defer g.waitGroup.Done()
fn()
}()
}
不难看出没有提供错误同时取消的功能,它认为两者就是互相独立互不干扰