用处
保护下游,针对下游的同一批请求,只有一个负责去请求,其他等待结果;
例如:缓存更新能够做到对同一个失效key的多个请求,只有一个请求执行对key的更新操作。
示例
func TestDoDupSuppress(t *testing.T) {
var g Group
c := make(chan string)
var calls int32
fn := func() (interface{}, error) {
atomic.AddInt32(&calls, 1)
return <-c, nil
}
const n = 10
var wg sync.WaitGroup
for i := 0; i < n; i++ {
wg.Add(1)
go func() { // n个协程同时调用了g.Do,fn中的逻辑只会被一个协程执行
v, err := g.Do("key", fn)
if err != nil {
t.Errorf("Do error: %v", err)
}
if v.(string) != "bar" {
t.Errorf("got %q; want %q", v, "bar")
}
wg.Done()
}()
}
time.Sleep(100 * time.Millisecond) // let goroutines above block
c <- "bar"
wg.Wait()
if got := atomic.LoadInt32(&calls); got != 1 {
t.Errorf("number of calls = %d; want 1", got)
}
}
- fn只被执行了一次 -> calls的值为1;
- 其他的携程都能拿到fn执行的结果;
原理
map存储每个key对应的call,每个call会被多个携程同时调用。
一个call里边有个waitgroup,第一个携程去执行调用,其他携程阻塞在wg上边。 (关键就是这个wg)
- call的结构
type call struct {
wg sync.WaitGroup
val interface{} //最终返回的结果
err error
}
- map的结构
type Group struct {
mu sync.Mutex // protects m
m map[string]*call // lazily initialized
}
- 调用
func (g *Group) Do(key string, fn func() (interface{}, error)) (interface{}, error) {
g.mu.Lock()
if g.m == nil {
g.m = make(map[string]*call)
}
if c, ok := g.m[key]; ok {
g.mu.Unlock()
c.wg.Wait() //其他的请求阻塞
return c.val, c.err
}
c := new(call)
c.wg.Add(1)
g.m[key] = c
g.mu.Unlock()
c.val, c.err = fn() //第一个去执行调用
c.wg.Done() //同一批都返回
g.mu.Lock()
delete(g.m, key)
g.mu.Unlock()
return c.val, c.err
}
- 实际使用的例子
ProductSku = singleflight.Group{}
skuList, err, shared := ProductSku.Do(strconv.FormatInt(productId, 10), func() (i interface{}, e error) {
return rpc.GetProductSku(ctx, productId, nil)
})