singleflight

说明

可以直接看看go官方扩展包,大致用途就是针对并行的返回相同的多个请求,通过某种方式只真实的请求一次,这种方式其实很简单,就是放行一个请求,然后依赖锁的互斥,使得其他的请求保持等待,直到请求返回,其他请求直接使用返回结果,就避免了重复请求

源码

// Copyright 2013 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

// Package singleflight provides a duplicate function call suppression
// mechanism.
package singleflight // import "golang.org/x/sync/singleflight"

import (
    "bytes"
    "errors"
    "fmt"
    "runtime"
    "runtime/debug"
    "sync"
)

// 这个错误意味着实际请求的方法调用了runtime.Goexit进行了退出
// 可用于通知其他等待请求的goroutine
// errGoexit indicates the runtime.Goexit was called in
// the user given function.
var errGoexit = errors.New("runtime.Goexit was called")

// 包装panic的错误,里面有错误信息和调用栈
// A panicError is an arbitrary value recovered from a panic
// with the stack trace during the execution of given function.
type panicError struct {
    value interface{}
    stack []byte
}

// 实现error接口
// Error implements error interface.
func (p *panicError) Error() string {
    return fmt.Sprintf("%v\n\n%s", p.value, p.stack)
}

// new一个,其实就是把调用栈加进去
func newPanicError(v interface{}) error {
    stack := debug.Stack()

    // The first line of the stack trace is of the form "goroutine N [status]:"
    // but by the time the panic reaches Do the goroutine may no longer exist
    // and its status will have changed. Trim out the misleading line.
    if line := bytes.IndexByte(stack[:], '\n'); line >= 0 {
        stack = stack[line+1:]
    }
    return &panicError{value: v, stack: stack}
}

// call其实就是代表一个请求
// call is an in-flight or completed singleflight.Do call
type call struct {
    // 这个用来做并行请求间的互斥和同步
    // wg.wait可以用来做互斥,wg.done可以用来做同步
    wg sync.WaitGroup

    // 请求返回
    // These fields are written once before the WaitGroup is done
    // and are only read after the WaitGroup is done.
    val interface{}
    // 请求错误
    err error

    // 标记请求是否从集合中删除
    // forgotten indicates whether Forget was called with this call's key
    // while the call was still in flight.
    forgotten bool

    // dups标记有多少个其他的请求共享结果
    // These fields are read and written with the singleflight
    // mutex held before the WaitGroup is done, and are read but
    // not written after the WaitGroup is done.
    dups  int
    // chans标记有多少个请求通过channel共享结果
    chans []chan<- Result
}

// calls的集合
// 其实很简单,就是一把锁和一个map,如我们开篇想的一样
// Group represents a class of work and forms a namespace in
// which units of work can be executed with duplicate suppression.
type Group struct {
    mu sync.Mutex       // protects m
    m  map[string]*call // lazily initialized
}

// Result主要是为了方便把请求的返回结果包装好丢到channel中
// Result holds the results of Do, so they can be passed
// on a channel.
type Result struct {
    Val    interface{}
    Err    error
    Shared bool
}

// Do方法通过key来找到集合g中是否有相同key的进行中的请求
// 如果没有则会调用fn来执行请求,该方法会直接返回请求的返回、错误以及是否被其他请求共享
// Do executes and returns the results of the given function, making
// sure that only one execution is in-flight for a given key at a
// time. If a duplicate comes in, the duplicate caller waits for the
// original to complete and receives the same results.
// The return value shared indicates whether v was given to multiple callers.
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
    // 上来就一把锁,因为后面会立即操作g.m
    // go的map是不支持并发的
    g.mu.Lock()
    // 这里是一个延迟初始化,也就是我们使用的时候可以不用管Group.m
    if g.m == nil {
        g.m = make(map[string]*call)
    }
    // 先判断g.m中是否有key对应的call
    // 如果有,说明有相同的请求在执行,那么等待结果就好了
    if c, ok := g.m[key]; ok {
        // 有了新的等待的请求,dups计数自增1
        c.dups++
        // 到这里就可以解锁了
        // if之前解锁可以吗?想啥呢,在读map呢
        // c.dups++之前解锁可以吗?想啥呢,除非dups++换成atomic.Add之类的原子操作
        // 总没人想问为什么不在wg.Wait()之后解锁吧,出门右转啊
        g.mu.Unlock()
        c.wg.Wait()

        // 能走到这里,说明fn执行完毕了
        // 判断是不是panic以及是否调用了runtime.Goexit()主动退出goroutine
        // panicError和errGoexit两种错误类型会在doCall方法里面处理
        if e, ok := c.err.(*panicError); ok {
            // 透传panic
            panic(e)
        } else if c.err == errGoexit {
            // 透传Goexit
            // 意思就是某个请求进行了Goexit,那么其他等待该请求结果的其他请求也会进行Goexit
            // 注意runtime.Goexit()虽然看起来效果跟panic差不多(立即中断执行并退出,退出之前还能执行注册过的defer),但是不会被recover,也只会退出当前的goroutine,不影响其他的
            runtime.Goexit()
        }
        // 返回请求结果
        return c.val, c.err, true
    }

    // 如果没有,那就new一个call并执行返回结果
    c := new(call)
    // wg添加一个等待,如果没有add就wait是会panic的
    c.wg.Add(1)
    // 加入到集合中
    g.m[key] = c
    // 这里的解锁一定要放到操作g.m之后,因为虽然相同的多个并行请求能走到这里的只有一个
    // 但是不同请求也会共享g.m,所以对g.m的操作务必保证全局有序
    g.mu.Unlock()
    // 调用doCall来执行fn并处理某些特殊情况
    g.doCall(c, key, fn)
    // 返回结果,结果都存在c中,这样能共享给等待请求结果的其他请求
    return c.val, c.err, c.dups > 0
}

// DoChan方法跟Do差不多,只是有些许不同,比如返回的是一个只读的channel而不是请求结果
// 不过这个channel也是用来传递请求结果的
// DoChan is like Do but returns a channel that will receive the
// results when they are ready.
//
// The returned channel will not be closed.
func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result {
    // 无非就是比Do方法多初始化了个channel
    ch := make(chan Result, 1)
    g.mu.Lock()
    if g.m == nil {
        g.m = make(map[string]*call)
    }
    if c, ok := g.m[key]; ok {
        c.dups++
        // 把初始化好的channel加入到c的共享channel集合中
        c.chans = append(c.chans, ch)
        // 注意,这里的解锁必须在操作了c.chans之后,否则可能导致chans中的channel一直拿不到结果
        // 因为doCall方法里delete(g.m, key)的操作也是获取到g.mu.Lock()之后执行的
        // 这样两个锁之间是互斥的,保证了g.m的对key的删除和读取是互斥的
        // 而且doCall中删除key的锁也会锁住遍历c.chans并放入result
        // 所以获取g.m[key]的锁也需要锁住append(c.chans, ch)才能保证加入channel集合和遍历赋值channel是先后序的
        g.mu.Unlock()
        // 这里就不用跟Do方法一样使用wg.Wait()来等待了,因为返回的是channel,直接通过读取channel阻塞等待就好了
        return ch
    }
    // 如果集合中没有key对应的请求就放行一个
    c := &call{chans: []chan<- Result{ch}}
    // 这里Add的原因是考虑后续可能会使用Do方法来执行相同key的请求,同时也是为了兼容doCall的wg.Done()避免panic
    // 因为不管是Do方法还是DoChan方法都是调用doCall,都会把结果写入到call中
    // 不同的是DoChan返回的是channel,把call中的结果冗余了一份到Result中,而Do则是直接返回结果
    // 所以即使先使用DoChan调用请求,再使用Do调用相同的请求,也是可以复用DoChan的结果
    // 同理先使用Do再使用DoChan也是一样的,无非就是make一个channel丢到c.chans里面
    c.wg.Add(1)
    g.m[key] = c
    g.mu.Unlock()

    // 因为返回的是channel不需要等待,直接go执行更快
    go g.doCall(c, key, fn)

    return ch
}

// doCall是真正执行fn的方法,同时也兼容了Do和DoChan,返回结果的同时会将结果同步一份到c.chans中(如果需要)
// doCall handles the single call for a key.
func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
    // 标记正常返回,非panic,非runtime.Goexit
    normalReturn := false
    // 是否需要recover,即是否有panic
    // 通过这两个标签,就可以过滤出runtime.Goexit
    recovered := false

    // 这是一个典型的双defer操作来区分出panic和runtime.Goexit
    // 主要吧是因为这个runtime.Goexit没法被recover,但是还好能触发defer
    // 建议先看下面的defer
    // use double-defer to distinguish panic from runtime.Goexit
    // more details see https://golang.org/cl/134395
    defer func() {
        // 第二个defer处理完之后
        // 如果即不是正常返回又不是panic,那就是runtime.Goexit了
        // the given function invoked runtime.Goexit
        if !normalReturn && !recovered {
            //设置对应错误
            c.err = errGoexit
        }

        // 到这里就可以唤醒wg.Wait了,因为返回结果收集到了
        // 对应的error信息也设置完了
        c.wg.Done()
        // 下面是操作g.m了,需要lock
        g.mu.Lock()
        // defer嵌套defer
        defer g.mu.Unlock()
        // 判断key是否已经从g.m中删除,因为可以通过Forget方法主动删除
        if !c.forgotten {
            delete(g.m, key)
        }

        // 其他等待的请求会在Do和DoChan中处理错误进行panic和Goexit
        // 当前请求需要在当前goroutine中处理错误
        if e, ok := c.err.(*panicError); ok {
            // In order to prevent the waiting channels from being blocked forever,
            // needs to ensure that this panic cannot be recovered.
            // 如果有监听结果的channel,为了防止panic被上层recover导致goroutine泄露
            // 直接使用go panic(e)来终止掉进程,这个感觉够狠的,难道不能遍历下c.chans然后close掉吗?
            if len(c.chans) > 0 {
                go panic(e)
                // 这里会阻塞,主要是为了panic的时候能保留本次调用的现场
                select {} // Keep this goroutine around so that it will appear in the crash dump.
            } else {
                panic(e)
            }
        } else if c.err == errGoexit {
            // 因为是当前goroutine,本来就执行完了,没啥可做的,符合Goexit
            // 不过有个问题,如果是调用Do方法就问题不大,但是如果调用DoChan方法可能会导致goroutine泄露,具体可以看后面的例子
            // Already in the process of goexit, no need to call again
        } else {
            // Normal return
            // 通知c.chans的每个嗷嗷待哺的channel
            for _, ch := range c.chans {
                ch <- Result{c.val, c.err, c.dups > 0}
            }
        }
    }()

    // 包到一个方法中,这样如果panic了可以通过recover恢复执行
    // runtime.Goexit则会中断到第一个defer
    // 基于这个差别就可以通过操作标志变量来区分啦
    func() {
        defer func() {
            // 如果没有正常返回,则尝试recover,panic会被recover恢复,并new一个panic的error
            if !normalReturn {
                // Ideally, we would wait to take a stack trace until we've determined
                // whether this is a panic or a runtime.Goexit.
                //
                // Unfortunately, the only way we can distinguish the two is to see
                // whether the recover stopped the goroutine from terminating, and by
                // the time we know that, the part of the stack trace relevant to the
                // panic has been discarded.
                if r := recover(); r != nil {
                    c.err = newPanicError(r)
                }
            }
        }()

        // 执行请求
        c.val, c.err = fn()
        // 设置正常返回
        normalReturn = true
    }()

    // 能走到这里,要么正常返回,要么panic
    if !normalReturn {
        // 如果不是正常返回,那么就是panic,设置恢复标志
        recovered = true
    }
}
// 这里抛出一个疑问,难道一个defer不能区分panic和Goexit吗,答案是不可以
// 如果是一个defer,通过recover是可以抓住panic,但是Goexit呢,要知道Goexit后fn的返回是没有err,val也应该是nil
// 除非fn中处理了Goexit返回了err,但是这是不合理的,因为你不应该强制使用者了解底层的处理细节,从而遵守约定


// 这个比较简单,就是手动删掉g.m中的某个key
// Forget tells the singleflight to forget about a key.  Future calls
// to Do for this key will call the function rather than waiting for
// an earlier call to complete.
func (g *Group) Forget(key string) {
    g.mu.Lock()
    if c, ok := g.m[key]; ok {
        c.forgotten = true
    }
    delete(g.m, key)
    g.mu.Unlock()
}

Bad Case

func main() {
    g := &singleflight.Group{}
    f := func() (interface{}, error) {
        time.Sleep(time.Second * 2)
        // panic("test")  如果是panic,会终止掉程序
        // runtime.Goexit() 如果是Goexit,会导致第二个goroutine一直在for循环中等待
        return nil, nil
    }

    var wg sync.WaitGroup
    wg.Add(2)
    go func() {
        defer func() {
            if e := recover(); e != nil {
                fmt.Println("recover")
            }
            wg.Done()
        }()
        v, e, s := g.Do("joker", f)
        fmt.Println(v, e, s)
    }()
    time.Sleep(time.Second)
    go func() {
        defer wg.Done()
        ch := g.DoChan("joker", f)
        for {
            select {
            case <-ch:
                fmt.Println("get success")
                break
            case <-time.After(time.Second):
                fmt.Println("1 second wait")
            }
        }
    }()

    wg.Wait()
}

这是一个比较典型的例子,可以先试试panic,对于panic,Do和DoChan都有对应处理,再试试Goexit就会发现,第二个goroutine泄露了会一直循环下去,原因是使用了DoChan,个人理解没有将Goexit传递到所有等待请求结果的其他请求中

总结

  • go panic的效果
  • defer的骚操作
  • 使用锁来进行互斥,以及最小粒度的保护临界区
  • sync.WaitGroup的互斥和同步用法
    使用Do方法,如果fn执行时间比较长,在并发度比较高的情况下会导致比较多的goroutine阻塞,可以使用DoChan,在等待channel返回的时候通过select和time.After来控制超时,但是切记,如果使用了DoChan,一定要避免死等待
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 218,607评论 6 507
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,239评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,960评论 0 355
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,750评论 1 294
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,764评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,604评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,347评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,253评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,702评论 1 315
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,893评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,015评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,734评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,352评论 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,934评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,052评论 1 270
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,216评论 3 371
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,969评论 2 355

推荐阅读更多精彩内容

  • singleflight是什么 singleflight是Go官方扩展同步包(golang.org/x/sync/...
    cindywang阅读 4,807评论 0 1
  • 指针 指针就是地址,指针变量就是存储地址的变量 *p : 解引用,间接引用 栈帧:用来给函数运行提供内存空间,取内...
    雪上霜阅读 297评论 0 0
  • 2024年1月16日再看 1.18 源代码的时候发现代码已经发生了变化。本文档的代码大概在 1.14 左右。 原书...
    Robin92阅读 1,823评论 7 7
  • 官方包的注释: sync包提供基础的同步原语,sync.Mutext、sync.RWMutex、sync.Wait...
    thepoy阅读 608评论 0 1
  • 本文从上下文Context、同步原语与锁、Channel、调度器四个方面介绍Go语言是如何实现并发的。本文绝大部分...
    彦帧阅读 1,568评论 1 3