Golang: Sync.Pool 源码解析

Sync.Pool

Sync.Pool

需要提前了解GMPhttps://www.kancloud.cn/aceld/golang/1958305#2GolangGMP_2

简单来说就是Goroutine(Go协程): Thread(线程): Process(调度器)

不在详细展开了, 只针对Pool做一个简单的分析

使用

package main

import "sync"

type Instance struct {
  Id string
}

func main() {
  // new a pool and Type is *Instance
  pool := sync.Pool{
    New: func() interface{} {
      return &Instance{Id: ""}
    },
  }
  
  // get from empty Pool
  ins := pool.Get().(*Instance)
  ins.Id = "1"
  
  // after usage, put back to pool
  pool.Put(ins)
  
  // check if same with var ins
  print(pool.Get().(*Instance).Id)
}

结构体

Pool

type Pool struct {
    noCopy noCopy

    local     unsafe.Pointer // local fixed-size per-P pool, actual type is [P]poolLocal
    localSize uintptr        // size of the local array

    victim     unsafe.Pointer // local from previous cycle
    victimSize uintptr        // size of victims array

    // New optionally specifies a function to generate
    // a value when Get would otherwise return nil.
    // It may not be changed concurrently with calls to Get.
    New func() any
}

Pool是我们实际接触的数据, 其中包含了

  • local 是个数组,长度为 P 的个数。其元素类型是 poolLocal。这里面存储着各个 P 对应的本地对象池。可以近似的看做 [P]poolLocal
  • localSize。代表 local 数组的长度。因为 P 可以在运行时通过调用 runtime.GOMAXPROCS 进行修改, 因此我们还是得通过 localSize 来对应 local 数组的长度。
  • New 就是用户提供的创建对象的函数。这个选项也不是必需。当不填的时候,Get 有可能返回 nil。

poolLocal

// Local per-P Pool appendix.
type poolLocalInternal struct {
    private any       // Can be used only by the respective P.
    shared  poolChain // Local P can pushHead/popHead; any P can popTail.
}

type poolLocal struct {
    poolLocalInternal

    // Prevents false sharing on widespread platforms with
    // 128 mod (cache line size) = 0 .
    pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte
}

poolLocal是和每个P绑定的一个存储

  • private是为了能够快速处理, 尤其是类似于经常性的出现Get->Put->Get->Put时, 减少请求双链表存储的次数.
  • shared有两个作用, 第一是作为一个大容量的存储, 第二是其他的P窃取.

这里稍微聊一下poolLocalpad主要是为了加速CPU的Cache Line,使用的是缓存行Padding (谨慎使用)

不要忘记了, poolLocal是一个Core独占的, 那么这个时候, 防止其他的碎片数据一起塞入缓存行就有作用了, 即不会因为碎片数据的频繁更新而刷新Cache Line, 导致出现不命中导致的其他问题.

另外需要注意, 这里的poolChain是结构体, 而不是指针, 原因是poolChain是一个非常短小的结构体

type poolChain struct {
    head *poolChainElt
    tail *poolChainElt
}

方法

Put

func (p *Pool) Put(x any) {
    // 检查x是否为空
    if x == nil {
        return
    }

/*  
    这部分不用看, race.Enabled是用来指示是否启用了竞态检测器。当你使用 -race 标志编译你的Go程序时,竞态检测器会被启用,而这个变量会被设置为 true。
    通常只在调试或测试时使用,而不在生产环境下使用。

    if race.Enabled {
        if fastrandn(4) == 0 {
            // Randomly drop x on floor.
            return
        }
        race.ReleaseMerge(poolRaceAddr(x))
        race.Disable()
    }
*/
    // pin函数的作用我们稍后再聊, 简单来说就是获取和`P`绑定的poolLocal
    l, _ := p.pin()
    // 因为是在同一个线程下执行的
    // 所以没有一些互斥锁等等

    // 如果能够直接放入private, 则直接对private赋值
    if l.private == nil {
        l.private = x
    } else {
        // 塞入缓存队列中.
        l.shared.pushHead(x)
    }
    // unpin操作
    runtime_procUnpin()
    if race.Enabled {
        race.Enable()
    }
}

Put主要有这么几步

  1. 绑定G -> P
  2. 获取P的id
  3. 根据pid获取p对应的poolLocal
  4. 优先直接放入private, 其次放入缓存列表中

Get

func (p *Pool) Get() any {
/*
    if race.Enabled {
        race.Disable()
    }
*/
    // 绑定G -> P, 并返回P的poolLocal
    l, pid := p.pin()
    // 优先使用private
    x := l.private
    // 不管最后有没有用到, private一定是nil
    l.private = nil

    // 如果为nil
    if x == nil {
        // Try to pop the head of the local shard. We prefer
        // the head over the tail for temporal locality of
        // reuse.
        // 尝试从缓存列表中得到一个
        // 尝试从head弹出元素,而不是尾部,这种偏好是基于时间局部性原理。
        x, _ = l.shared.popHead()
        // 如果得不到
        if x == nil {
            // 尝试从其他P中获取, 如果没法获取, 则尝试从victim中获取
            x = p.getSlow(pid)
        }
    }
    runtime_procUnpin()
/*
    if race.Enabled {
        race.Enable()
        if x != nil {
            race.Acquire(poolRaceAddr(x))
        }
    }
*/
    // 如果没有缓存中没有任何对象, 但是有New函数, 那么尝试直接New一个
    if x == nil && p.New != nil {
        x = p.New()
    }
    // 返回x, 需要注意x存在为nil的可能性
    return x
}

总结

Get主要有这么几步

  1. 绑定G -> P
  2. 获取P的id
  3. 根据pid获取p对应的poolLocal
  4. 优先使用private
  5. 其次使用自己的缓存列表
  6. 再次使用尝试从其他的P里的poolLocal中获取一个
  7. 还不行就从vivtim中复用一个
  8. 如果Pool里没有任何可用对象, New

Other

pin

pin将当前goroutine绑定到P,禁用抢占,并返回P的poolLocal池和P的id。
调用者在使用完池后必须调用runtime_procUnpin()。

<span style="font-weight: bold;" class="bold">实现</span>

func (p *Pool) pin() (*poolLocal, int) {
    // 调用runtime_procPin函数将当前的goroutine固定到一个P上,并获取该P的id。
    pid := runtime_procPin()

    // 加载p.localSize和p.local
    s := runtime_LoadAcquintptr(&p.localSize) // load-acquire
    l := p.local                              // load-consume

    // 检查pid是否小于p.localSize。
    if uintptr(pid) < s {
        // 如果是,则返回对应的poolLocal和pid。
        return indexLocal(l, pid), pid
    }
    // 如果是,则返回对应的poolLocal和pid。
    return p.pinSlow()
}

这里做一些解释, pid通常是 0 - GOMAXPROCS的一个值, 用来标记是哪一个线程

localSize的值一般来说就等于GOMAXPROCS

如果uintptr(pid) < s, 就代表着此时的poolLocal已经被初始化过, 那么此时就可以直接返回.

反之就必须要做初始化的工作.

indexLocal

这个函数的作用比较简单, 主要是根据原始指针 + pid偏移, 计算出真正的poolLocal

func indexLocal(l unsafe.Pointer, i int) *poolLocal {
    lp := unsafe.Pointer(uintptr(l) + uintptr(i)*unsafe.Sizeof(poolLocal{}))
    return (*poolLocal)(lp)
}

pinSlow

func (p *Pool) pinSlow() (*poolLocal, int) {
    // 调用pinSlow前, 必定调用了pin, 暂时释放, 因为在pin的过程中, 无法对互斥锁进行操作
    runtime_procUnpin()
    // 初始化local, 禁止并发访问
    allPoolsMu.Lock()
    defer allPoolsMu.Unlock()
    // 重新pin
    pid := runtime_procPin()
    // poolCleanup 在pin的过程中, GC不会调用
    // double check
    s := p.localSize
    l := p.local
    if uintptr(pid) < s {
        return indexLocal(l, pid), pid
    }
    // 为了GC时能够管理所有的pool, 会将p放入管理队列中
    if p.local == nil {
        allPools = append(allPools, p)
    }
    // If GOMAXPROCS changes between GCs, we re-allocate the array and lose the old one.
    // 获取GOMAXPROCS, 表示最多有多少`P`
    size := runtime.GOMAXPROCS(0)
    // 初始化 pool
    local := make([]poolLocal, size)
    // 之所以只用atomic, 可能是为了防止在多核环境下出现cache不一致问题.
    atomic.StorePointer(&p.local, unsafe.Pointer(&local[0])) // store-release
    runtime_StoreReluintptr(&p.localSize, uintptr(size))     // store-release
    // 返回该`P`所对应的poolLocal, 以及pid
    return &local[pid], pid
}

getSlow

func (p *Pool) getSlow(pid int) any {
    // 获取p的数量
    size := runtime_LoadAcquintptr(&p.localSize) // load-acquire
    locals := p.local                            // load-consume
    // Try to steal one element from other procs.
    // 尝试从其他的P的缓存中偷取一个
    for i := 0; i < int(size); i++ {
        l := indexLocal(locals, (pid+i+1)%int(size))
        if x, _ := l.shared.popTail(); x != nil {
            return x
        }
    }

    // Try the victim cache. We do this after attempting to steal
    // from all primary caches because we want objects in the
    // victim cache to age out if at all possible.
    // 尝试从victim中复用一个
    size = atomic.LoadUintptr(&p.victimSize)
    // 如果victim中也没有, 直接返回
    if uintptr(pid) >= size {
        return nil
    }
    locals = p.victim
    l := indexLocal(locals, pid)
    if x := l.private; x != nil {
        l.private = nil
        return x
    }
    for i := 0; i < int(size); i++ {
        l := indexLocal(locals, (pid+i)%int(size))
        if x, _ := l.shared.popTail(); x != nil {
            return x
        }
    }

    // Mark the victim cache as empty for future gets don't bother
    // with it.
    // 如果victim中也没有, 直接将其标记为0, 防止其他的P又一次的遍历vivtim
    atomic.StoreUintptr(&p.victimSize, 0)

    return nil
}

poolCleanup

func poolCleanup() {
    // This function is called with the world stopped, at the beginning of a garbage collection.
    // It must not allocate and probably should not call any runtime functions.

    // Because the world is stopped, no pool user can be in a
    // pinned section (in effect, this has all Ps pinned).

    // Drop victim caches from all pools.
    for _, p := range oldPools {
        p.victim = nil
        p.victimSize = 0
    }

    // Move primary cache to victim cache.
    for _, p := range allPools {
        p.victim = p.local
        p.victimSize = p.localSize
        p.local = nil
        p.localSize = 0
    }

    // The pools with non-empty primary caches now have non-empty
    // victim caches and no pools have primary caches.
    oldPools, allPools = allPools, nil
}

这个函数会在GC是调用, 主要的作用就是将当前的local刷到victim中, 之所以不直接清理掉, 是因为可能出现大量的New的情况.

可以参考: https://en.wikipedia.org/wiki/Victim_cache

我没太明白, 但是在这里的使用, 我理解主要是用作一个缓冲和平滑作用.

拓展阅读 poolChain

结构体

poolChain

type poolChain struct {
    // head is the poolDequeue to push to. This is only accessed
    // by the producer, so doesn't need to be synchronized.
    // 只有生产者会操作head, 所以不存在并发竞争
    head *poolChainElt

    // tail is the poolDequeue to popTail from. This is accessed
    // by consumers, so reads and writes must be atomic.
    // 可能存在多个消费者使用tail, 因此必须保证原子性的处理tail
    tail *poolChainElt
}

poolDequeue

type poolDequeue struct {
    // headTail packs together a 32-bit head index and a 32-bit
    // tail index. Both are indexes into vals modulo len(vals)-1.
    //
    // tail = index of oldest data in queue
    // head = index of next slot to fill
    //
    // Slots in the range [tail, head) are owned by consumers.
    // A consumer continues to own a slot outside this range until
    // it nils the slot, at which point ownership passes to the
    // producer.
    //
    // The head index is stored in the most-significant bits so
    // that we can atomically add to it and the overflow is
    // harmless.
    // 将head 和 tail 封装成一个, 可以更好的保证原子性
    headTail uint64

    // vals is a ring buffer of interface{} values stored in this
    // dequeue. The size of this must be a power of 2.
    // vals[i].typ is nil if the slot is empty and non-nil
    // otherwise. A slot is still in use until *both* the tail
    // index has moved beyond it and typ has been set to nil. This
    // is set to nil atomically by the consumer and read
    // atomically by the producer.
    // 实际存储的示例 (指针)
    vals []eface
}

poolChainElt

type poolChainElt struct {
    poolDequeue

    // next and prev link to the adjacent poolChainElts in this
    // poolChain.
    //
    // next is written atomically by the producer and read
    // atomically by the consumer. It only transitions from nil to
    // non-nil.
    //
    // prev is written atomically by the consumer and read
    // atomically by the producer. It only transitions from
    // non-nil to nil.
    next, prev *poolChainElt
}

eface

type eface struct {
    typ, val unsafe.Pointer
}

<span style="font-weight: bold;" class="bold">注意</span>

在 Go 语言中,interface{} 是一个空接口,它可以接受任何类型的值。

然而,当我们需要存储一个 interface{} 类型的值时,实际上我们需要存储两个信息:值的类型和值本身。

这是因为 Go 语言的接口是静态类型的,即使是空接口也需要知道值的具体类型。

eface 结构体被用来表示一个空接口的值。它有两个字段:typ 和 val,分别用来存储值的类型和值本身。

这样做的好处是可以显式地管理这两个信息,而不是依赖 Go 语言的运行时系统来管理。

这对于实现低级别的并发数据结构(如poolChain的无锁队列)是非常有用的,因为这样可以更精细地控制内存的使用和同步。

此外,使用 unsafe.Pointer 可以避免 Go 语言的垃圾收集器误判这些值仍然在使用。

当一个值被从队列中移除时,它的 typ 字段会被设置为 nil,这样 Go 语言的垃圾收集器就知道这个值不再被使用,可以安全地回收它。

总的来说,使用 eface 结构体而不是直接使用 interface{} 可以提供更精细的控制,这对于实现高效的并发数据结构是非常重要的。



type poolChainElt struct {
    poolDequeue

    // next and prev link to the adjacent poolChainElts in this
    // poolChain.
    //
    // next is written atomically by the producer and read
    // atomically by the consumer. It only transitions from nil to
    // non-nil.
    //
    // prev is written atomically by the consumer and read
    // atomically by the producer. It only transitions from
    // non-nil to nil.
    next, prev *poolChainElt
}

type poolDequeue struct {
    // headTail packs together a 32-bit head index and a 32-bit
    // tail index. Both are indexes into vals modulo len(vals)-1.
    //
    // tail = index of oldest data in queue
    // head = index of next slot to fill
    //
    // Slots in the range [tail, head) are owned by consumers.
    // A consumer continues to own a slot outside this range until
    // it nils the slot, at which point ownership passes to the
    // producer.
    //
    // The head index is stored in the most-significant bits so
    // that we can atomically add to it and the overflow is
    // harmless.
    headTail uint64

    // vals is a ring buffer of interface{} values stored in this
    // dequeue. The size of this must be a power of 2.
    //
    // vals[i].typ is nil if the slot is empty and non-nil
    // otherwise. A slot is still in use until *both* the tail
    // index has moved beyond it and typ has been set to nil. This
    // is set to nil atomically by the consumer and read
    // atomically by the producer.
    vals []eface
}

方法

poolChain.pushHead

func (c *poolChain) pushHead(val any) {
    d := c.head
    // 如果为初始化
    if d == nil {
        // Initialize the chain.
        const initSize = 8 // Must be a power of 2
        d = new(poolChainElt)
        d.vals = make([]eface, initSize)
        c.head = d
        // 初始化一个, 并且原子性的设置head以及tail
        storePoolChainElt(&c.tail, d)
    }
    // 如果能够写入当前的dequeue中, 则直接返回
    if d.pushHead(val) {
        return
    }

    // The current dequeue is full. Allocate a new one of twice
    // the size.
    // 如果dequeue已经满了
    // 第二层的dequeue直接翻倍
    newSize := len(d.vals) * 2
    // 一层最大有1 << 30个
    if newSize >= dequeueLimit {
        // Can't make it any bigger.
        newSize = dequeueLimit
    }

    // 初始化下一层
    d2 := &poolChainElt{prev: d}
    d2.vals = make([]eface, newSize)
    c.head = d2
    storePoolChainElt(&d.next, d2)
    // 刚初始化, 必定可以直接放入
    d2.pushHead(val)
}

当head已经填满了, 就会生成的head, 同时修改当前的head为创建的, 并将新创建的head的prev设为之前的head

poolChain.popHead

func (c *poolChain) popHead() (any, bool) {
    // 获取当前的head poolChainElt
    d := c.head
    for d != nil {
        // 尝试从当前的head的dequeue中pop一个
        if val, ok := d.popHead(); ok {
            return val, ok
        }
        // 如果没法成功, 则尝试
        // There may still be unconsumed elements in the
        // previous dequeue, so try backing up.
        d = loadPoolChainElt(&d.prev)
    }
    return nil, false
}

需要注意的是目前的实现会有一个问题, 那就是在批量push, 又批量pop时, 可能会频繁的调用loadPoolChainElt(&d.prev)

另外一个问题是, 可能会导致c.head.prev中出现大量的空poolDequeue

对于一些比较大量的使用pool的程序来说, 可能会引入一些问题.

一个可能的做法是

func (c *poolChain) popHead() (any, bool) {
    d := c.head
    for d != nil {
        if val, ok := d.popHead(); ok {
            return val, ok
        }
        // There may still be unconsumed elements in the
        // previous dequeue, so try backing up.
        prev := loadPoolChainElt(&d.prev)

        // when try to load d.prev, that means the current dequeue is empty
        // try remove the current dequeue from the chain
        // and try to load the previous dequeue
        // atomic do this
        {
            d.prev.next = d.next
            d.next.prev = d.prev
            d.next = nil
            d.prev = nil
        }

        d = prev
    }
    return nil, false
}

但是很难在无锁的情况下实现, 而且可能引入更加复杂的处理.

popTail

func (c *poolChain) popTail() (any, bool) {
    // 获取tail
    d := loadPoolChainElt(&c.tail)
    // 如果没有tail, 即没有初始化过, 直接返回
    if d == nil {
        return nil, false
    }

    for {
        // It's important that we load the next pointer
        // *before* popping the tail. In general, d may be
        // transiently empty, but if next is non-nil before
        // the pop and the pop fails, then d is permanently
        // empty, which is the only condition under which it's
        // safe to drop d from the chain.
        // 获取d 的 next
        d2 := loadPoolChainElt(&d.next)
        // 尝试popTail
        if val, ok := d.popTail(); ok {
            return val, ok
        }
        // 如果tail poolDequeue已经空了, 则返回false
        if d2 == nil {
            // This is the only dequeue. It's empty right
            // now, but could be pushed to in the future.
            return nil, false
        }

        // The tail of the chain has been drained, so move on
        // to the next dequeue. Try to drop it from the chain
        // so the next pop doesn't have to look at the empty
        // dequeue again.
        // 
        if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&c.tail)), unsafe.Pointer(d), unsafe.Pointer(d2)) {
            // We won the race. Clear the prev pointer so
            // the garbage collector can collect the empty
            // dequeue and so popHead doesn't back up
            // further than necessary.
            storePoolChainElt(&d2.prev, nil)
        }
        d = d2
    }
}

poolChain.popTailpoolChain 结构体的一个方法,它的主要作用是从队列的尾部移除并返回一个元素。如果队列为空,它将返回 false。这个方法可以被任意数量的消费者调用。

以下是 poolChain.popTail 方法的详细步骤:

  1. 首先,它通过 loadPoolChainElt(&c.tail) 获取队列的尾部元素 d。如果 dnil,则表示队列为空,直接返回 nil, false
  2. 然后,它进入一个无限循环,尝试从队列的尾部弹出一个元素。在每次循环中,它首先在弹出尾部元素之前加载下一个指针 d2。这是因为在一般情况下,d 可能会暂时为空,但如果在弹出操作之前 next 是非 nil 的,并且弹出操作失败,那么 d 就会永久性地为空。这是从链中删除 d 的唯一条件。
  3. 接着,它尝试通过 d.popTail() 方法从 d 的尾部弹出一个元素。如果成功,那么它将返回弹出的元素和 true
  4. 如果 d2nil,那么这就是唯一的队列。虽然现在它是空的,但是未来可能会有元素被推入,所以返回 nil, false
  5. 如果队列的尾部已经被清空,那么就移动到下一个队列 d2。尝试从链中删除它,这样下一次弹出操作就不必再看到空的队列。这是通过 atomic.CompareAndSwapPointer 实现的。
  6. 如果我们赢得了比赛,那么就清除 d2.prev 指针,这样垃圾收集器就可以收集空的队列,而且 popHead 不必再回退更多。
  7. 最后,将 d 设置为 d2,然后进入下一次循环。
image

注意

poolDequeue 是一个无锁的固定大小的单生产者,多消费者队列。它有三个主要的方法:pushHeadpopHeadpopTail

  1. pushHead(val any) bool 方法:

这个方法将一个元素添加到队列的头部。如果队列已满,它将返回 false。这个方法只能由单个生产者调用。

它首先获取当前的头部和尾部索引,然后检查队列是否已满。如果队列已满,它将返回 false。否则,它将获取头部索引对应的槽位,并检查该槽位是否已被 popTail 方法释放。如果该槽位还未被释放,那么队列实际上仍然是满的,因此返回 false

如果头部槽位是空的,那么生产者就拥有了这个槽位,并将值存入该槽位。然后,它将头部索引加一,这将把槽位的所有权传递给 popTail 方法,并作为写入槽位的存储屏障。

  1. popHead() (any, bool) 方法:

这个方法从队列的头部移除并返回一个元素。如果队列为空,它将返回 false。这个方法只能由单个生产者调用。

它首先获取当前的头部和尾部索引,然后检查队列是否为空。如果队列为空,它将返回 nil, false。否则,它将头部索引减一,并尝试获取头部索引对应的槽位。如果成功,那么它就拥有了这个槽位,并从槽位中读取出值。

然后,它将槽位清零。由于这个方法不与 pushHead 方法竞争,所以这里不需要特别小心。

  1. popTail() (any, bool) 方法:

这个方法从队列的尾部移除并返回一个元素。如果队列为空,它将返回 false。这个方法可以被任意数量的消费者调用。

它首先获取当前的头部和尾部索引,然后检查队列是否为空。如果队列为空,它将返回 nil, false。否则,它将尾部索引加一,并尝试获取尾部索引对应的槽位。如果成功,那么它就拥有了这个槽位,并从槽位中读取出值。

然后,它将槽位的值字段清零,并将类型字段设置为 nil,这将把槽位的所有权传递给 pushHead 方法,并告诉 pushHead 方法这个槽位已经不再使用,可以安全地回收它。

需要注意的是, popTail -> 完全释放slot并不是一个原子性操作.

所以pushHead需要做两个操作:

  1. 查看是否能够获取该槽
  2. 查看popTail是不是已经释放了该槽
  3. pushHeadpopHead在同一时间只会有一个占用, 所以可以不考虑并发问题
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容