golang sync.pool源码分析无锁队列实现

何为对象池

在开发过程中,代码里头我们经常会创建和销毁同一类对象。而频繁的创建和销毁开销还是很大的,常见的优化手段就是创建对象池对象池就是提前创建很多对象,使用过的对象不销毁保存起来,等下次请求在重复使用该对象。我们今天分析的主角就是:Go 标准库sync.Pool(go1.16.2)。但是sync.Pool中的对象会被GC所清理掉。

sync.pool使用

func main() {
  count:=0
  pool := sync.Pool{New:func() interface{} {
        count++
        return count
  }}
    v1 := pool.Get()
    fmt.Printf("value 1: %v\n", v1)
    pool.Put(10)
    pool.Put(11)
    pool.Put(12)
    v2 := pool.Get()
    fmt.Printf("value 2: %v\n", v2)
  // 第一次gc
    runtime.GC()
    v3 := pool.Get()
    fmt.Printf("value 3: %v\n", v3)
    runtime.GC()
  // 第二次gc
    v4 := pool.Get()
    fmt.Printf("value 4: %v\n", v4)
    pool.New = nil
    v5 := pool.Get()
    fmt.Printf("value 5: %v\n", v5)
}

指定New回调,当Get没有获取到的时候,就会调用该方法。这里调用GC,目的是验证sync.pool会被GC给清理掉,并且我们会看到第二次GC后,才会正真意义的释放

Pool源码分析

type Pool struct {
   // go vet检测使用
   noCopy noCopy
     // 指向数组实际的类型[P]poolLoca
   local     unsafe.Pointer // local fixed-size per-P pool, actual type is [P]poolLocal
   // 数组大小
   localSize uintptr        // size of the local array
   // 受害者缓存 类似分代垃圾回收的思想
   victim     unsafe.Pointer // local from previous cycle
   victimSize uintptr        // size of victims array
   // get获取不到的创建新的对象使用
   New func() interface{}
}

type poolLocal struct {
    poolLocalInternal
    // 占位使用,防止cpu cache line上发生false sharing
    pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte
}
type poolLocalInternal struct {
    private interface{} // Can be used only by the respective P.
    shared  poolChain   // Local P can pushHead/popHead; any P can popTail.
}

Put保存对象

func (p *Pool) Put(x interface{}) {
   // 数据为nil不保存
   if x == nil {
      return
   }
   // data race
   // 绑定g和p,禁止当前P被抢占
   l, _ := p.pin()
   // 将x赋值给private
   if l.private == nil {
      l.private = x
      x = nil
   }
   // private已经赋值了,那只能放本地的队列中了
   if x != nil {
      l.shared.pushHead(x)
   }
   // 解除抢占
   runtime_procUnpin()
   // data race
}

此处省略data race校验相关的代码。

  • 调用pin()绑定p上锁,防止当前的p被其他的g抢占。
  • 保存数据,先存private,再存shared队列中的。pushHead我们后面分解
  • runtime_procUnpin可以理解为解锁,解除抢占

Get获取对象

func (p *Pool) Get() interface{} {
   // .... data race
   // 绑定g和p,禁止当前P被抢占 
   l, pid := p.pin()
   x := l.private
   l.private = nil
   if x == nil {
      // Try to pop the head of the local shard. We prefer
      // the head over the tail for temporal locality of
      // reuse.
      // 本地的用pophead不用popTail,popTail会浪费空间
      x, _ = l.shared.popHead()
      if x == nil {
        // 从其他的P获取队列,只偷一次,不确定会不会出现P没偷完的情况。(还需恶补GMP模型)
         x = p.getSlow(pid)
      }
   }
   // 解除抢占
   runtime_procUnpin()
     // .... data race
   if x == nil && p.New != nil {
      x = p.New()
   }
   return x
}

此处省略data race校验相关的代码。

  1. 调用pin()绑定p上锁,防止当前的p被其他的g抢占
  2. 取出private赋值给x,如果x为空,继续取队列shared.popHead()的值。如果获取不到在从其他的p获取一个poolLocal。
  3. getSlow中先从其他P获取,如果其他p获取不到在从cache获取数据,最后都获取不到,就调用New生成

详细看pin()怎么获取poolLocal

func (p *Pool) pin() (*poolLocal, int) {
   // GMP会获取到对应的p。我们可以理解跟进程pid类似
   pid := runtime_procPin()
   // 获取size大小
   s := runtime_LoadAcquintptr(&p.localSize) // load-acquire
   l := p.local                              // load-consume
   if uintptr(pid) < s {
      // 根据pid直接获取对应的poolLocal
      return indexLocal(l, pid), pid
   }
   // 第一次localSize=0的时候或者被gc的时候。
   return p.pinSlow()
}
// runtime_procPin最终调用,获取到GMP中P的id
func procPin() int {
    _g_ := getg()
    mp := _g_.m
    mp.locks++
    return int(mp.p.ptr().id)
}

procPin()这里mp.locks加锁,禁止其他g抢占p。之后获取原子性的获取p.localsize,这里只有gc的时候,才会修改这个值。为什么加上原子性的操作? 如果本地有直接indexLocal获取,indexLocal(取p.local首地址+当前pid*size大小偏移)获取。 如果本地也获取不到,调用p.pinSlow()。从其他的p获取。第一次或对象被gc的时候,也会调用pinSlow初始化。

pinSlow获取存储poolLocal

image-20210715200935699.png
func (p *Pool) pinSlow() (*poolLocal, int) {
   // 先释放抢占
   runtime_procUnpin()
   // 上锁,全局大锁,上锁更耗时吧,所以上面先释放了
   allPoolsMu.Lock()
   defer allPoolsMu.Unlock()
   // 这里应该会获取到别的p
   pid := runtime_procPin()
   // poolCleanup won't be called while we are pinned.
   s := p.localSize
   l := p.local
   // 在判断一把,自身p没有,也可能自身的p被其他的G抢占了。设置了pid有数据
   if uintptr(pid) < s {
      return indexLocal(l, pid), pid
   }
   // 开始的时候,gc后,获取别的p的时候,local肯定都是nil
   if p.local == nil {
      allPools = append(allPools, p)
   }
   // 获取跟CPU核数相同的P
   size := runtime.GOMAXPROCS(0)
   local := make([]poolLocal, size)
   // 将切片存到p.local,这里就是为什么我们get的时候,从local获取
   atomic.StorePointer(&p.local, unsafe.Pointer(&local[0])) // store-release
   // 在存localSize大小
   runtime_StoreReluintptr(&p.localSize, uintptr(size))     // store-release
   // 返回对应的poolLocal
   return &local[pid], pid
}
  1. 全局大锁,阻塞等待上锁的时间,比较持久。借用曹大的分析几个 Go 系统可能遇到的锁问题,如果每次请求都new一个sync.Pool,那么每次请求将是灾难性的全局锁加锁。因为新的pool一定会走到pinSlow去创建poolLocal。所以我们需要先runtime_procUnpin释放抢占

  2. 再次获取P,如果其他的P之前已经设置过了。那么local就会有数据了,所以需要在判断一把。

  3. 之后都是初始化操作,给对应的p.local和p.localSize赋值(代码注释),这里为什么需要原子操作,可能gc的时候,会操作。需要在查些资料可能会清晰一些,大家可以一起探讨

数据存储shared.pushHead()

push_pool.gif
func (c *poolChain) pushHead(val interface{}) {
   d := c.head
   if d == nil {
      // 必须是2的倍数?应该跟后续双倍扩容,dequeueLimit判断。不浪费最后的空间
      const initSize = 8 // Must be a power of 2
      d = new(poolChainElt)
      // 只存8个数据。
      d.vals = make([]eface, initSize)
      c.head = d
     // 因为如果你不传二级指针(指针的指针),那么就是指针的副本赋值了,这样tail其实本身没有赋值
      storePoolChainElt(&c.tail, d)
   }
     // 都是对poolDequeue赋值,操作。
   if d.pushHead(val) {
      return
   }
   // 队列满时,双倍扩容
   newSize := len(d.vals) * 2
   // 边界判断一把
   if newSize >= dequeueLimit {
      // Can't make it any bigger.
      newSize = dequeueLimit
   }
     // 双向链表链接prev即指向d->c.head->上一个的自身d2
   d2 := &poolChainElt{prev: d}
   d2.vals = make([]eface, newSize)
   // 把头节点始终指向最新对象
   c.head = d2
   // 并且刚才的下一个指针指向刚创建的对象。如图
   storePoolChainElt(&d.next, d2)
  // 都是对poolDequeue赋值,操作。
   d2.pushHead(val)
}
  • 看图,这里有多个pin获取到pid,并且获取到相应的poolLocal,这时候前置private已经存储。那么就将数据存储到队列中。
  • 当前切片vals存储满时,就扩容poolChainElt。并且增加里头的存储值vals扩容两倍。之后拼接链表
  • 这里双端循环列表的操作思想,和我们当时分析过redigo连接池源码解析连接池的思想都是一样的。这里sync.pool的很多无锁操作思想,还是值得大家思考和学习的。

poolDequeue.pushHead存储具体的val的值

// 存储vals和headTail值
func (d *poolDequeue) pushHead(val interface{}) bool {
  // popTail会抢其他的p来修改headTail值.
    ptrs := atomic.LoadUint64(&d.headTail)
  // 高32位赋给head、低32位赋给tail
    head, tail := d.unpack(ptrs)
  // len(d.vals)大小固定,head每次++。相等就说明队列满了
    if (tail+uint32(len(d.vals)))&(1<<dequeueBits-1) == head {
        return false
    }
  // 这里因为tail只会++,所以head大小会大于len大小的情况
    slot := &d.vals[head&uint32(len(d.vals)-1)]
    // slot会被其他的p popTail掉,导致队列还是满的???所以才要原子操作,没看懂
    typ := atomic.LoadPointer(&slot.typ)
    if typ != nil {
        return false
    }
    // 基础校验
    if val == nil {
        val = dequeueNil(nil)
    }
  // 这里unsafe.Pointer存储,指针操作内存块赋值
    *(*interface{})(unsafe.Pointer(slot)) = val
    // 这里就是给head高32位+1
    atomic.AddUint64(&d.headTail, 1<<dequeueBits)
    return true
}
  • 注意其他p会抢占当前执行的队列。gmp里头,偷其他对列的思想。这里前提必须是自身的队列都取完了,后续的popTail我们会分析
  • unpack取高32位给head、低32位赋值给tail
  • 可以学习一下unsafe.Pointer这种存储操作。c艹基操,(*interface{})看成(void*)

先从本地获取数据shared.popHead()

func (c *poolChain) popHead() (interface{}, bool) {
    d := c.head
  // 一直取,直到当前的队列获取不到数据,这里每个p put和get每次只能一个拥有pin()类似给p加锁
    for d != nil {
    // 从数组中获取数据
        if val, ok := d.popHead(); ok {
            return val, ok
        }
        // 后进先出
        d = loadPoolChainElt(&d.prev)
    }
    return nil, false
}
popHead.gif
  • 当c.head取完,就取下一个的d.pre如图
  • 这里d.next没有清空,没有关系,因为新插入的时候,d.next会指向新的对象

队头取值

func (d *poolDequeue) popHead() (interface{}, bool) {
   var slot *eface
   for {
      // 取headtail
      ptrs := atomic.LoadUint64(&d.headTail)
      head, tail := d.unpack(ptrs)
      // 校验队列是否为空
      if tail == head {
         return nil, false
      }

      // pop出后,head--
      head--
      ptrs2 := d.pack(head, tail)
      // cas比较当前headTail是否有其他的P修改过
      if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) {
         // 没人修改取出slot
         slot = &d.vals[head&uint32(len(d.vals)-1)]
         break
      }
   }
     // 取出值
   val := *(*interface{})(unsafe.Pointer(slot))
   if val == dequeueNil(nil) {
      val = nil
   }
   // Zero the slot. Unlike popTail, this isn't racing with
   // pushHead, so we don't need to be careful here.
   // 这里说这里是没有race的,其实我也没搞明白为什么没有race
   *slot = eface{}
   return val, true
}
  • 常见的无锁cas操作,因为当前的p的和其他偷过来的p,会同时操作headTail,并且当前的p中head--和偷的p使用popTail() tail--会操作同一个值
cas.gif
  • 这里slot直接赋值为空,官方注释说,这里没有race现象,可以直接清0

队尾取值

func (d *poolDequeue) popTail() (interface{}, bool) {
   var slot *eface
   // 和队头取值cas思路一致,只是tail++
   for {
      ptrs := atomic.LoadUint64(&d.headTail)
      head, tail := d.unpack(ptrs)
      if tail == head {
         // Queue is empty.
         return nil, false
      }
      ptrs2 := d.pack(head, tail+1)
      if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) {
         // tail也会有大于len的情况.因为每次tail只++
         slot = &d.vals[tail&uint32(len(d.vals)-1)]
         break
      }
   }

   // 取值
   val := *(*interface{})(unsafe.Pointer(slot))
   if val == dequeueNil(nil) {
      val = nil
   }

   // 告知pushHead这个slot已经让出???没明白
   slot.val = nil
   atomic.StorePointer(&slot.typ, nil)
   // At this point pushHead owns the slot.
   return val, true
  • 对比popHead,一个使用*slot = eface{},一个使用atomic.StorePointer。都是清空,都是赋值nil,好像没啥并发问题。 这里存在疑惑???有清楚的小伙伴,欢迎评论区留言,到时候请教yyds-曹大

多线程下,如何获取下一个队列

func (c *poolChain) popTail() (interface{}, bool) {
   // 队尾为空,返回
   d := loadPoolChainElt(&c.tail)
   if d == nil {
      return nil, false
   }

   for {
      // 先取出下一个节点,因为当前d节点获取的时候为空,执行的时候,另外一个p又插入的数据
      d2 := loadPoolChainElt(&d.next)
      // 获取vals切片数据 
      if val, ok := d.popTail(); ok {
         return val, ok
      }
      // 校验d2是否为空
      if d2 == nil {
         return nil, false
      }

      // 原子性cas对比d的值中途是否有改变
      if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&c.tail)), unsafe.Pointer(d), unsafe.Pointer(d2)) {
         // popTail为空,并且d2非空。 则清空上一个数据
         storePoolChainElt(&d2.prev, nil)
      }
      d = d2
   }
}

如何做到无锁线程安全的,还是很值得学习的:

  1. 如果先d.popTail取出切片的数据,判断为空。 之后取下一个数据d.next。因为获取d2之前,另外一个P瞬间把d切片写满。 这时候d2不为空。接下去就会把d删除。会造成误删
  2. 必须在poptail前,获取d2,并且d2非空。 popTail也为空,才能证明tail是永久性的为空。然后清除
  3. 会不会出现即使先取出d2非空,然后popTail为空,但是一瞬间,popTail被写满呢。 不会,因为pushHead都是从头部插入,如果d2非空了,那么插入肯定是从d2当前节点(未写满)或者下一个节点d3开始节点,而不会写d这个节点。这里还是很难理解的。大家要结合pushHead只从头插入,并且每次都是递增的插入。如果可以队尾插入,是做不到无锁的。

从其他P偷队列处理,获取数据

func (p *Pool) getSlow(pid int) interface{} {
   // 取出poolLocal数组大小
   size := runtime_LoadAcquintptr(&p.localSize) // load-acquire
   // 切片地址
   locals := p.local                            // load-consume
   // 遍历所有的poolLocal
   for i := 0; i < int(size); i++ {
      // pid+1去取poolLocal,就是当前的p可以取到其他poolLocal
      l := indexLocal(locals, (pid+i+1)%int(size))
      // 偷来的队列,只能从队尾读取数据
      if x, _ := l.shared.popTail(); x != nil {
         return x
      }
   }

   // 在从cache里取数据
   size = atomic.LoadUintptr(&p.victimSize)
   if uintptr(pid) >= size {
      return nil
   }
   locals = p.victim
   l := indexLocal(locals, pid)
   if x := l.private; x != nil {
      l.private = nil
      return x
   }
   for i := 0; i < int(size); i++ {
      l := indexLocal(locals, (pid+i)%int(size))
      if x, _ := l.shared.popTail(); x != nil {
         return x
      }
   }
   // Mark the victim cache as empty for future gets don't bother
   // with it.
   atomic.StoreUintptr(&p.victimSize, 0)

   return nil
}
  • 偷取的队列,只能从队尾获取数据
  • 如果队列数据都为空,则从cache中获取数据

总结

  • Put只能从对头插入,并且每个p只能插入自己的队列,所以可以理解为线程安全的。
  • Get获取先从private -> shared -> 循环其他P -> victim cache -> New
  • poolLocal和GMP中的P大小一致,GMP中的思想,尽可能的提高性能。
  • 多个地方使用无锁思想。这里特别强调为什么每个p来说,是单个生产者,多个消费者模式。并且只能队头插入,如果对尾也可以插入,这种做法是无法做到无锁操作的。
  • 引入victim cache,gc中新老年代的思想,降低 GC 压力的同时提高命中率。

疑问

  • 切片清空的时候,popHead和popTail中,一个使用*slot = eface{}、一个使用atomic.StorePointer。都是赋值为空。还不清楚为什么,虽然说popTail是非线程安全的,但是为什么这么做。

还是有一些优秀的内存池如,groupcachebigcache等后续值得学习

大家可以添加我的wx一起探讨

我是dandyhzh,码字不易,点个小赞,只希望大家能更加明白

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,142评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,298评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,068评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,081评论 1 291
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,099评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,071评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,990评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,832评论 0 273
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,274评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,488评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,649评论 1 347
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,378评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,979评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,625评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,796评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,643评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,545评论 2 352

推荐阅读更多精彩内容

  • 如果能够将所有内存都分配到栈上无疑性能是最佳的,但不幸的是我们不可避免需要使用堆上分配的内存。我们可以优化使用堆内...
    光华路程序猿阅读 450评论 0 1
  • 参考go语言的官方包sync.Pool的实现原理和适用场景深入Golang之sync.Pool详解伪共享(fals...
    合肥黑阅读 2,250评论 0 0
  • 转自:https://zhuanlan.zhihu.com/p/76812714[https://zhuanlan...
    o0O0o_b39d阅读 654评论 0 0
  • 前言 在 golang 中有一个池,它特别神奇,你只要和它有个约定,你要什么它就给什么,你用完了还可以还回去,但是...
    LinkinStar阅读 19,651评论 3 8
  • 表情是什么,我认为表情就是表现出来的情绪。表情可以传达很多信息。高兴了当然就笑了,难过就哭了。两者是相互影响密不可...
    Persistenc_6aea阅读 124,879评论 2 7