资源链接
介绍
- 就是一个池子,可以暂时存储对象并查询对象。
- 任何存储在池子里面的对象可能会被自动移除(GC时),如果此时池子仅仅保留了引用,那么对象将会被 deallocated。
- 一个池子可以被多个goroutines同时安全地访问。
引发的关于 Cache 和 Pool 的争论
gc(garbage collector)
- Go 是自动垃圾回收的,减少了程序员的负担
- GC 是一把双刃剑,带来便利但是也增加了开销,使用不当会严重影响程序的性能
- 高性能场景下,不能任意产生太多的垃圾(GC 负担重,会影响性能)
如何解决GC负担重?
- 避免大家重复造轮子,开发了 Pool 包来保存和复用临时对象,以减少内存分配,降低 GC 压力
-
http://echo.labstack.com/guide/routing
Echo 的路由使用了 sync pool 来重复利用内存并且几乎达到了零内存占用 - gin 的 context 通过 pool 来 get 和 put,也就是使用了 sync.Pool 进行维护
两种使用方式
// 方法一
package main
import(
"fmt"
"log"
"runtime"
"sync"
)
func main(){
p := &sync.Pool{
New: func() interface{} {
return 0
},
}
a := p.Get().(int)
p.Put(1)
b := p.Get().(int)
fmt.Println(a, b) // 输出 0 1
p.Put(3)
p.Put(4)
p.Put(5)
log.Println(p.Get()) // 返回 3 4 5 中的任意一个
// 主动调用 GC, pool 中的对象会被列入 victim 缓存
runtime.GC()
c := p.Get().(int)
log.Println(c) // 拿到的是 4
// 再次调用 GC, pool 中的 victim 缓存会被删除
runtime.GC()
c = p.Get().(int)
log.Println(c) // 拿到的是 0
}
// 方法二
package main
import(
"fmt"
"sync"
)
func main(){
// 如果我们不指定 New 函数的话,会返回 nil
p := &sync.Pool{}
a := p.Get()
if a == nil {
a = func() interface{} {
return 0
}
}
p.Put(1)
b := p.Get().(int)
fmt.Println(a, b) // 输出 0 1
}
Pool 源码解读
Pool 结构
type Pool struct {
// 用来标记,当前的 struct 是不能够被 copy 的
noCopy noCopy
// P 个固定大小的 poolLocal 数组,每个 P 拥有一个空间
local unsafe.Pointer // local fixed-size per-P pool, actual type is [P]poolLocal
// 上面数组的大小,即 P 的个数
localSize uintptr // size of the local array
// 同 local 和 localSize,只是在 gc 的过程中保留一次
victim unsafe.Pointer // local from previous cycle
victimSize uintptr // size of victims array
// 自定义一个 New 函数,然后可以在 Get 不到东西时,自动创建一个
New func() interface{}
}
因为 Pool 不希望被复制,所以结构体里有一个 noCopy 的字段,使用 go vet 工具可以检测到用户代码是否复制了 Pool。noCopy
是 go1.7 开始引入的一个静态检查机制。它不仅仅工作在运行时或标准库,同时也对用户代码有效。用户只需实现这样的不消耗内存、仅用于静态分析的结构,来保证一个对象在第一次使用后不会发生复制。
// Local per-P Pool appendix.
type poolLocalInternal struct {
// private 存储一个 Put 的数据,pool.Put() 操作优先存入 private,如果private有信息,才会存入 shared
private interface{} // Can be used only by the respective P.
// 存储一个链表,用来维护 pool.Put() 操作加入的数据,每个 P 可以操作自己 shared 链表中的头部,而其他的 P 在用完自己的 shared 时,可能会来偷数据,从而操作链表的尾部
shared poolChain // Local P can pushHead/popHead; any P can popTail.
}
// unsafe.Sizeof(poolLocal{}) // 128 byte(1byte = 8 bits)
// unsafe.Sizeof(poolLocalInternal{}) // 32 byte(1byte = 8 bits)
type poolLocal struct {
poolLocalInternal
// Prevents false sharing on widespread platforms with
// 128 mod (cache line size) = 0 .
pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte
}
Get()
func (p *Pool) Get() interface{} {
...
l, pid := p.pin() // 获取当前 pool 的 poolLocal,也就是 p.local[pid]
x := l.private // 判断当前的临时变量是否有值,有则立即返回
l.private = nil
if x == nil {
// Try to pop the head of the local shard. We prefer
// the head over the tail for temporal locality of
// reuse.
x, _ = l.shared.popHead() // 从 shared poolChain 链表里面获取头部数据
if x == nil {
x = p.getSlow(pid) // 本线程的 Pool 没有数据了,就去其他线程的 Pool 池取
}
}
...
// 无法获取到值,则 New 一个,未设定 New 函数则返回 nil
if x == nil && p.New != nil {
x = p.New()
}
return x
}
Put()
func (p *Pool) Put(x interface{}) {
if x == nil {
return
}
...
l, _ := p.pin() // 获取当前 pool 的 poolLocal,也就是 p.local[pid],这里不关心 pid
// 优先写入 private 变量
if l.private == nil {
l.private = x
x = nil
}
// 如果 private 有值,则写入 shared poolChain 链表
if x != nil {
l.shared.pushHead(x)
}
...
}
indexLocal()
获取线程 pid (i) 对应的 poolLocal,因为是个数组,即 0+offset
func indexLocal(l unsafe.Pointer, i int) *poolLocal {
lp := unsafe.Pointer(uintptr(l) + uintptr(i)*unsafe.Sizeof(poolLocal{}))
return (*poolLocal)(lp)
}
getSlow()
func (p *Pool) getSlow(pid int) interface{} {
// See the comment in pin regarding ordering of the loads.
size := atomic.LoadUintptr(&p.localSize) // load-acquire
locals := p.local // load-consume
// Try to steal one element from other procs. 从其他的线程偷数据
for i := 0; i < int(size); i++ {
l := indexLocal(locals, (pid+i+1)%int(size)) // 从当前 pid 的 local 开始遍历其他线程的 pool 池(poolLocal),遍历一个圈。返回值为其他线程的 pool.poolLocal
// 从尾部获取数据
if x, _ := l.shared.popTail(); x != nil {
return x
}
}
// 当无法从其他线程的 poolLocal 得到信息,则从 victim 缓存区域获取(和 local 一样的逻辑)
size = atomic.LoadUintptr(&p.victimSize)
if uintptr(pid) >= size {
return nil
}
locals = p.victim
l := indexLocal(locals, pid)
if x := l.private; x != nil {
l.private = nil
return x
}
for i := 0; i < int(size); i++ {
l := indexLocal(locals, (pid+i)%int(size))
if x, _ := l.shared.popTail(); x != nil {
return x
}
}
// 如果 victim 全空,则 victimSize 设置为 0,防止下次再次遍历
atomic.StoreUintptr(&p.victimSize, 0)
return nil
}
pin()
pin 函数 pins 当前 goroutine 的 P,防止 preemption
returns poolLocal pool for the P and the P's id.
调用方当完成对 pool 的操作后,必须调用 runtime_procUnpin()
func (p *Pool) pin() (*poolLocal, int) {
pid := runtime_procPin()
// In pinSlow we store to local and then to localSize, here we load in opposite order.
// Since we've disabled preemption, GC cannot happen in between.
// Thus here we must observe local at least as large localSize.
// We can observe a newer/larger local, it is fine (we must observe its zero-initialized-ness).
s := atomic.LoadUintptr(&p.localSize) // load-acquire
l := p.local // load-consume
if uintptr(pid) < s {
return indexLocal(l, pid), pid // 获取当前线程的 poolLocal 和 pid
}
return p.pinSlow()
}
// 该函数链接于 runtime.proc.go:sync_runtime_procPin 函数
func runtime_procPin(){}
//go:linkname sync_runtime_procPin sync.runtime_procPin
//go:nosplit
func sync_runtime_procPin() int {
return procPin()
}
//go:nosplit
func procPin() int {
_g_ := getg()
mp := _g_.m
mp.locks++
return int(mp.p.ptr().id)
}
//go:nosplit
func procUnpin() {
_g_ := getg()
_g_.m.locks--
}
pinSlow()
func (p *Pool) pinSlow() (*poolLocal, int) {
// 由于调用该函数前 pin 过,这里需要 unpin,否则 allPoolsMu 无法被加锁
runtime_procUnpin()
// 对 allPools 变量加锁,来操作 allPools,这里存储所有的 pool
allPoolsMu.Lock()
defer allPoolsMu.Unlock()
// 重新 pin 当前线程的 P
pid := runtime_procPin()
// pin 后 poolCleanup 不会被调用
s := p.localSize
l := p.local
if uintptr(pid) < s {
return indexLocal(l, pid), pid
}
if p.local == nil {
allPools = append(allPools, p)
}
// 如果 GOMAXPROCS 在 GCs 时发生了改变,我们重新分配 local,并设置 localSize
size := runtime.GOMAXPROCS(0) // 获取线程数
local := make([]poolLocal, size) // 每个线程一个 poolLocal,所以这里设置为 size 个大小的数组
atomic.StorePointer(&p.local, unsafe.Pointer(&local[0])) // store-release
atomic.StoreUintptr(&p.localSize, uintptr(size)) // store-release
return &local[pid], pid
}
poolCleanUp()
该函数在 init 函数中注册到 runtime 中,在调用 GC 前,函数被调用
func poolCleanup() {
// Drop victim caches from all pools.
for _, p := range oldPools {
p.victim = nil
p.victimSize = 0
}
// Move primary cache to victim cache.
for _, p := range allPools {
p.victim = p.local
p.victimSize = p.localSize
p.local = nil
p.localSize = 0
}
// 所有的池都丢掉主缓存,并数据移动到 victim 缓存
oldPools, allPools = allPools, nil
}
func init() {
runtime_registerPoolCleanup(poolCleanup)
}
// 该函数链接于 runtime.mgc.go:sync_runtime_registerPoolCleanup
func runtime_registerPoolCleanup(cleanup func())
//go:linkname sync_runtime_registerPoolCleanup sync.runtime_registerPoolCleanup
func sync_runtime_registerPoolCleanup(f func()) {
poolcleanup = f
}
总结
sync.Pool 的特性
- 池不能够指定大小,大小只受限于 GC 的临界值(GOMAXPROCS)
- 对象最大的缓存周期是两个 GC 周期,每次 GC ,当前的 primary cache 会被转移到 victim cache,primary cache 清空,而原来 victim cache 被释放
- 取值顺序:当前 P 的 primary cache(local)的 poolLocal.private → 当前 P 的 primary cache(local)的 poolLocal.shared.head → 其他 P 的主存(local)的 poolLocal.shared.tail → 当前 P 的 victim cache(victim) 的 poolLocal.private → 当前 P 的 victim cache(victim) 的 poolLocal.shared.head → 其他 P 的主存(local)的 poolLocal.shared.tail → p.New() → nil
- 插入顺序:当前 P 的 primary cache(local)的 poolLocal.private → 当前 P 的 primary cache(local)的 poolLocal.shared.head
Use sync.Pool
- How to implement Memory Pooling in Golang
- How did I improve latency by 700% using sync.Pool
- 【译】CockroachDB GC优化总结
- Golang 优化之路
以下是视频
- Mastering Go Programming: Syncs and Locks | packtpub,com
-
justforfunc #37: sync.Pool from the pool
优化的情况 - SREcon17 Asia/Australia: Golang's Garbage
- Google Go 团队如何进行 CodeReview