作者近期在写一个项目时遇到了这样的需求:调用一个库API函数,函数内部又会拉起若干个后台goroutine。这时后台goroutine如果遇到错误想要及时通知库的使用者将不会是一件容易的事情,因为这是一个异步通知error的方法。
作者最终的解决方案概括为:使用者另启一个goroutine监听Err channel,库后台goroutine出现的错误将直接发送至Err channel中。
作者以自己项目简单举例:
func DaemonListen(err <-chan Errsocket){
for {
v, ok := <-err
if ok {
fmt.Println(v) /*处理错误*/
} else {
fmt.Println("Listen closed.")/*后台安全退出*/
return
}
}
}
func main(){
x := NewServer(/*......*/)
//后台会启动多个goroutine协同工作,该方法立即返回
x.Listen()
//启动守护goroutine监听error channel
go DaemonListen(x.ErrDone())
}
需要注意的是:
- channel类型不一定只是error。如果你需要更多的信息,完全可以是一个包含error的struct;
- 告诉守护goroutine可以安全退出的方法是关闭该channel,此时需保证该channel不会再被使用。若此时有goroutine试图向该channel发送error则会引发panic。
- 使用者不启动该goroutine可能会引发程序阻塞。
库如何安全关闭Channel
1. 维护一个goroutine注册链表
即确保关闭该channel之前其余所有goroutine都已经安全退出,不会再使用该channel。我们最先容易想到Go中的context标准库解决该问题。该标准库的作用也是维护层层调用的goroutine,并当parentCtx执行关闭操作时,能够顺利通知到所有childrenCtx,让所有childrenCtx安全退出。但遗憾的是,context只负责通知关闭,却不负责goroutine的退出顺序。即依然存在当channel被关闭时仍有子goroutine向channel发送数据的情况,我们仍需手动维护。另外,维护一个goroutine有时可能并不符合业务逻辑,例如:
当使用者调用exposedAPI关闭所有goroutine时,该API需要保存着所有运行着的goroutine信息。而事实上,goroutine并不需要向该API注册自己的信息。另外,当某goroutine异常宕机时,维护信息表也是一件较为复杂的事情。
2. errorDiversion
作者不清楚是否有业界前辈早已使用了类似或更成熟的技术,在这里作者只是提供自己处理该需求的一种方法。errDiversion(以下简称为eD)即另启一个守护goroutine,负责将error信息导流给上游channel或简单丢弃。
只需简单指定upstreamChannel和errChannel即可开启一个eD。eD
的工作逻辑如下:
- 若errChan(以下简称为eC)已被关闭,则自己安全退出;
- 若upstreamChan(以下简称为uC)已关闭,则将DATA直接丢弃;
- 若upstreamChan处于开启状态,将DATA发送至uC。
判断uC的状态可以尝试关闭通道并捕获panic;或使用flag变量记录uC开闭状态即可(注意维护数据一致性)。
为什么要新创建一个eD goroutine而不是在子goroutine发送error前先作检查:
新建eD的过程应该在父goroutine完成的,并只需要传递给子goroutine一个用于传递err的channel(eC)即可。对子goroutine屏蔽细节。
再次使用作者项目作简单演示:
func errDiversion(eD *eDer) func(eC chan Errsocket) {
//when upstream channel(uC) is closed(detected by closed flag),
//following data will be received but discarded
//when eC channel has closed, this goroutine will exit
return func(eC chan Errsocket) {
for {
err, ok := <-eC
if !ok { return }
if !eD.closed {
eD.eU <- err
}
eD.mu.Unlock()
}
}
}
func (s *server) Listen() { //Notifies the consumer when an error occurs ASYNCHRONOUSLY
/*......*/
//父goroutine创建eD,简单为子goroutine传递eC即可。
s.eDerfunc = errDiversion(&s.eDer) //closure
eC := make(chan error)
go s.eDerfunc(eC)
go handleListen(s, eC)
return
}
func handleListen(s *server, eC chan error) {
/*......*/
if err != nil {
eC <- err //s.errDoneTry即父goroutine传递的eC
}
/*......*/
}
建立函数闭包(closure)的原因:
对于一个eD而言,他的引用环境(uC,closed flag,mutex)是确定的。使用者只需传递eC即可使之正常工作。
数据一致性问题
最后简单提及维护数据一致性的问题。我们需要维护的有
- flag与channel close的关系;
- 确保eD能够及时执行(在uC关闭之前)【换言之,当eC存有error时,先等待eD处理error再关闭uC】。
在这里我们可以使用普通锁来实现:
type somestruct struct {
//under protected data
errDone chan Errsocket
errDoneTry chan Errsocket
closed bool
mu sync.Mutex
/*...other data...*/
}
func xxx(){
/*...*/
//close upstream channel
s.mu.Lock()
s.closed = true
close(s.errDone)
s.mu.Unlock()
}
func subgoroutine(){
/*...*/
if err != nil{
s.mu.Lock() //Notice, lock before sending data to channel
eC <- err
}
}
errDiversion的代码也需要作部分调整:
func errDiversion(eD *eDer) func(eC chan Errsocket) {
return func(eC chan Errsocket) {
for {
err, ok := <-eC
if !ok {
return
}
if !eD.closed {
eD.eU <- err
}
eD.mu.Unlock()
}
}
}
这样我们就完成了并发流程控制以及数据的一致性。注意不要在eD中上锁,因为读取eC是一个阻塞过程,会引发死锁。正确的做法是向eC传递error之前上锁。
多eD嵌套的解决方案
即某上游eD(下简称为A)的eC是某下游eD(下简称为B)的uC。他们是共享同一个channel而非传递的关系。当B发送error至uC(a.eC)时,需要获得上游的锁并加锁。
为要实现该功能,将errDiversion代码改为
//......
if !closed{
if eD.pmu != nil {
eD.pmu.Lock() //send to upstream channel
}
eD.eU <- err
}
//......
即可。但在这么做之前务必三思,是否一定要使用嵌套的方法。事实上,你也可以将下游的uC与上游共享:即下文提到的1:m。
1:m或n:m的解决方案
即多个eD的共享同一个uC。同一个uC意味着同一把锁(Mutex),同一个Flag标记uC状态。因此父goroutine应先将mutex和flag设置好,再将参数通过指针的方式传递给子goroutine。
等待错误反馈
上文提到的错误处理都是异步进行的。即不会等待错误处理,直接执行下面的流程。如果需要,可以阻塞代码流程等待错误反馈,即使错误处理同步进行。方法很简单,将mutex解锁的动作交与使用者即可。这样做的缺点也是显而易见的,使用者需要参与到库框架的运作中,增加了库运行的风险。同时,同步进行的错误处理也可能会降低运行效率。
总结
这套机制为处理goroutine异步通知
error提供了一种有效解决方案,库的使用者仅需启用一个goroutine监听errchannel即可。更贴心的话,库还可以帮助使用者创建监听errchannel的框架,使使用者通过注入的方式填补框架即可。
不足之处在于:
- 普遍情况下,开启一个子goroutine就需要另启一个eD作错误导流。过多的goroutine从
性能
而言并不是特别优秀; - 另外,他违反了通道关闭原则(一般原则下不允许接收方关闭通道和不能关闭一个有多个并发发送者的通道。 换而言之, 你只能在发送方的 goroutine 中关闭只有该发送方的通道)。关于如何优雅地关闭channel可参考这篇文章。
ps. 一套基于Socket的高并发Client-Server双向通信库chitchat-GitHub,目前还处于(缓慢悠闲的)开发状态。欢迎大家提供宝贵意见。