最近在看左神新书 《Go 语言设计与实现》的垃圾收集器时产生一个疑惑,花了点时间搞清楚了记录一下。
Go 语言垃圾回收的实现使用了标记清除算法,将对象的状态抽象成黑色(活跃对象)、灰色(活跃对象中间状态)、白色(潜在垃圾对象也是所有对象的默认状态)三种,注意没有具体的字段标记颜色。
整个标记过程就是把白色对象标黑的过程:
1.首先将 ROOT 根对象(包括全局变量、goroutine 栈上的对象等)放入到灰色集合
2.选一个灰色对象,标成黑色,将所有可达的子对象放入到灰色集合
3.重复2的步骤,直到灰色集合中为空
下图是书上的插图,看上去是一个典型的深度优先搜索的算法。
下图是刘丹冰写的《Golang 修养之路》的插图,看上去是一个典型的广度优先搜索的算法。
我疑惑的点在于这个标记过程是深度优先算法还是广度优先算法,因为很多文章博客对此都没有很清楚的说明,作为学习者这种细节其实也不影响对整个 GC 流程的理解,但是这种细节我非常喜欢扣:)
对着书和源码摸索着大致找到了一个结果是深度优先。下面看下大致的过程,源码基于1.15.2版本:
gcStart 是 Go 语言三种条件触发 GC 的共同入口
func gcStart(trigger gcTrigger) {
......
// 启动后台标记任务
gcBgMarkStartWorkers()
......
}
启动后台标记任务
func gcBgMarkStartWorkers() {
// Background marking is performed by per-P G's. Ensure that
// each P has a background GC G.
for _, p := range allp {
if p.gcBgMarkWorker == 0 {
// 为每个处理器创建用于执行后台标记任务的 Goroutine
go gcBgMarkWorker(p)
......
}
}
}
为每个处理器创建用于执行后台标记任务的 Goroutine
func gcBgMarkWorker(_p_ *p) {
......
for {
// Go to sleep until woken by gcController.findRunnable.
// We can't releasem yet since even the call to gopark
// may be preempted.
// 让当前 G 进入休眠
gopark(func(g *g, parkp unsafe.Pointer) bool {
park := (*parkInfo)(parkp)
// The worker G is no longer running, so it's
// now safe to allow preemption.
releasem(park.m.ptr())
// If the worker isn't attached to its P,
// attach now. During initialization and after
// a phase change, the worker may have been
// running on a different P. As soon as we
// attach, the owner P may schedule the
// worker, so this must be done after the G is
// stopped.
if park.attach != 0 {
p := park.attach.ptr()
park.attach.set(nil)
// cas the worker because we may be
// racing with a new worker starting
// on this P.
// 把当前的G设到P的gcBgMarkWorker成员
if !p.gcBgMarkWorker.cas(0, guintptr(unsafe.Pointer(g))) {
// The P got a new worker.
// Exit this worker.
return false
}
}
return true
}, unsafe.Pointer(park), waitReasonGCWorkerIdle, traceEvGoBlock, 0)
......
systemstack(func() {
// Mark our goroutine preemptible so its stack
// can be scanned. This lets two mark workers
// scan each other (otherwise, they would
// deadlock). We must not modify anything on
// the G stack. However, stack shrinking is
// disabled for mark workers, so it is safe to
// read from the G stack.
// 设置G的状态为等待中,这样它的栈可以被扫描
casgstatus(gp, _Grunning, _Gwaiting)
switch _p_.gcMarkWorkerMode {
default:
throw("gcBgMarkWorker: unexpected gcMarkWorkerMode")
case gcMarkWorkerDedicatedMode:
// 这个模式下P应该专心执行标记
gcDrain(&_p_.gcw, gcDrainUntilPreempt|gcDrainFlushBgCredit)
if gp.preempt {
// We were preempted. This is
// a useful signal to kick
// everything out of the run
// queue so it can run
// somewhere else.
// 被抢占时把本地运行队列中的所有G都踢到全局运行队列
lock(&sched.lock)
for {
gp, _ := runqget(_p_)
if gp == nil {
break
}
globrunqput(gp)
}
unlock(&sched.lock)
}
// Go back to draining, this time
// without preemption.
// 继续执行标记
gcDrain(&_p_.gcw, gcDrainFlushBgCredit)
case gcMarkWorkerFractionalMode:
// 执行标记
gcDrain(&_p_.gcw, gcDrainFractional|gcDrainUntilPreempt|gcDrainFlushBgCredit)
case gcMarkWorkerIdleMode:
// 执行标记, 直到被抢占或者达到一定的量
gcDrain(&_p_.gcw, gcDrainIdle|gcDrainUntilPreempt|gcDrainFlushBgCredit)
}
// 恢复G的状态到运行中
casgstatus(gp, _Gwaiting, _Grunning)
})
......
}
}
上面休眠的 G 会在调度循环中检查并唤醒执行
func schedule() {
......
// 正在 GC,去找 GC 的 g
if gp == nil && gcBlackenEnabled != 0 {
gp = gcController.findRunnableGCWorker(_g_.m.p.ptr())
tryWakeP = tryWakeP || gp != nil
}
......
// 开始执行
execute(gp, inheritTime)
}
执行标记
func gcDrain(gcw *gcWork, flags gcDrainFlags) {
.......
// Drain heap marking jobs.
// Stop if we're preemptible or if someone wants to STW.
for !(gp.preempt && (preemptible || atomic.Load(&sched.gcwaiting) != 0)) {
// Try to keep work available on the global queue. We used to
// check if there were waiting workers, but it's better to
// just keep work available than to make workers wait. In the
// worst case, we'll do O(log(_WorkbufSize)) unnecessary
// balances.
// 将本地一部分工作放回全局队列中
if work.full == 0 {
gcw.balance()
}
// 获取待扫描的对象,一个 fast path,没有则走 slow path
b := gcw.tryGetFast()
if b == 0 {
b = gcw.tryGet()
if b == 0 {
// Flush the write barrier
// buffer; this may create
// more work.
wbBufFlush(nil, 0)
b = gcw.tryGet()
}
}
if b == 0 {
// Unable to get work.
break
}
// 扫描获取到的对象
scanobject(b, gcw)
......
}
gcw 是每个 P 独有的所以不用担心并发的问题 和 GMP、mcache 一样设计,减少锁竞争
func (w *gcWork) tryGetFast() uintptr {
wbuf := w.wbuf1
if wbuf == nil {
return 0
}
if wbuf.nobj == 0 {
return 0
}
// 从 尾部 取出一个对象,对象数减一,重点是尾部
wbuf.nobj--
return wbuf.obj[wbuf.nobj]
}
// slow path
func (w *gcWork) tryGet() uintptr {
wbuf := w.wbuf1
if wbuf == nil {
w.init()
wbuf = w.wbuf1
// wbuf is empty at this point.
}
// 第一个 buf 为空
if wbuf.nobj == 0 {
// 交换第一和第二的 buf
w.wbuf1, w.wbuf2 = w.wbuf2, w.wbuf1
wbuf = w.wbuf1
// 都为空
if wbuf.nobj == 0 {
owbuf := wbuf
// 尝试在全局列表中获取一个不为空的 buf
wbuf = trygetfull()
// 全局也没有
if wbuf == nil {
return 0
}
// 把之前的空 buf 放到全局列表中
putempty(owbuf)
w.wbuf1 = wbuf
}
}
// 返回 buf 里最后一个对象
wbuf.nobj--
return wbuf.obj[wbuf.nobj]
}
尝试在全局列表中获取一个不为空的 buf
// trygetfull tries to get a full or partially empty workbuffer.
// If one is not immediately available return nil
//go:nowritebarrier
func trygetfull() *workbuf {
b := (*workbuf)(work.full.pop())
if b != nil {
b.checknonempty()
return b
}
return b
}
这是官方实现的无锁队列:)涨见识了,for 循环加原子操作实现栈的 pop
// lfstack is the head of a lock-free stack.
func (head *lfstack) pop() unsafe.Pointer {
for {
old := atomic.Load64((*uint64)(head))
if old == 0 {
return nil
}
node := lfstackUnpack(old)
next := atomic.Load64(&node.next)
if atomic.Cas64((*uint64)(head), old, next) {
return unsafe.Pointer(node)
}
}
}
到这里从灰色集合中获取待扫描的对象逻辑说完了。找到对象了接着就是 scanobject(b, gcw) 了,里面有两段逻辑要注意
func scanobject(b uintptr, gcw *gcWork) {
// Find the bits for b and the size of the object at b.
//
// b is either the beginning of an object, in which case this
// is the size of the object to scan, or it points to an
// oblet, in which case we compute the size to scan below.
// 获取 b 的 heapBits 对象
hbits := heapBitsForAddr(b)
// 获取 span
s := spanOfUnchecked(b)
// span 对应的对象大小
n := s.elemsize
if n == 0 {
throw("scanobject n == 0")
}
// 大于 128KB 的大对象 为了更高的性能 打散成小对象,加入到灰色集合中待扫描
if n > maxObletBytes {
......
// Enqueue the other oblets to scan later.
// Some oblets may be in b's scalar tail, but
// these will be marked as "no more pointers",
// so we'll drop out immediately when we go to
// scan those.
for oblet := b + maxObletBytes; oblet < s.base()+s.elemsize; oblet += maxObletBytes {
if !gcw.putFast(oblet) {
gcw.put(oblet)
}
}
}
// Compute the size of the oblet. Since this object
// must be a large object, s.base() is the beginning
// of the object.
n = s.base() + s.elemsize - b
if n > maxObletBytes {
n = maxObletBytes
}
}
// 一个指针一个指针的扫描
var i uintptr
for i = 0; i < n; i += sys.PtrSize {
// Find bits for this word.
if i != 0 {
// Avoid needless hbits.next() on last iteration.
hbits = hbits.next()
}
// Load bits once. See CL 22712 and issue 16973 for discussion.
bits := hbits.bits()
// During checkmarking, 1-word objects store the checkmark
// in the type bit for the one word. The only one-word objects
// are pointers, or else they'd be merged with other non-pointer
// data into larger allocations.
if i != 1*sys.PtrSize && bits&bitScan == 0 {
break // no more pointers in this object 通过位运算得出已经没有更多的指针了
}
if bits&bitPointer == 0 {
continue // not a pointer 不是指针
}
// Work here is duplicated in scanblock and above.
// If you make changes here, make changes there too.
// 根据偏移算出对象的指针
obj := *(*uintptr)(unsafe.Pointer(b + i))
// At this point we have extracted the next potential pointer. 找到下一个指针了
// Quickly filter out nil and pointers back to the current object.
if obj != 0 && obj-b >= n {
// Test if obj points into the Go heap and, if so,
// mark the object.
//
// Note that it's possible for findObject to
// fail if obj points to a just-allocated heap
// object because of a race with growing the
// heap. In this case, we know the object was
// just allocated and hence will be marked by
// allocation itself.
// 请注意,如果 obj 指向刚刚分配的堆对象,则 findObject 可能会因为堆增长的竞争而失败。
// 在这种情况下,我们知道对象刚刚被分配,因此将由分配本身标记。
// 标记期间分配的对象直接标位黑色(混合写屏障)
// 根据索引位置找到对象进行标色
if obj, span, objIndex := findObject(obj, b, i); obj != 0 {
greyobject(obj, b, i, span, gcw, objIndex)
}
}
}
......
}
根据索引位置找到对象进行标色
func greyobject(obj, base, off uintptr, span *mspan, gcw *gcWork, objIndex uintptr) {
// obj should be start of allocation, and so must be at least pointer-aligned.
if obj&(sys.PtrSize-1) != 0 {
throw("greyobject: obj not pointer-aligned")
}
mbits := span.markBitsForIndex(objIndex)
// 检查是否所有可到达的对象都被正确标记的机制, 仅出错使用
if useCheckmark {
if !mbits.isMarked() {
printlock()
print("runtime:greyobject: checkmarks finds unexpected unmarked object obj=", hex(obj), "\n")
print("runtime: found obj at *(", hex(base), "+", hex(off), ")\n")
// Dump the source (base) object
gcDumpObject("base", base, off)
// Dump the object
gcDumpObject("obj", obj, ^uintptr(0))
getg().m.traceback = 2
throw("checkmark found unmarked object")
}
hbits := heapBitsForAddr(obj)
if hbits.isCheckmarked(span.elemsize) {
return
}
hbits.setCheckmarked(span.elemsize)
if !hbits.isCheckmarked(span.elemsize) {
throw("setCheckmarked and isCheckmarked disagree")
}
} else {
if debug.gccheckmark > 0 && span.isFree(objIndex) {
print("runtime: marking free object ", hex(obj), " found at *(", hex(base), "+", hex(off), ")\n")
gcDumpObject("base", base, off)
gcDumpObject("obj", obj, ^uintptr(0))
getg().m.traceback = 2
throw("marking free object")
}
// If marked we have nothing to do.
if mbits.isMarked() {
return
}
// 设置标记 黑色
mbits.setMarked()
// Mark span. 标记 span
arena, pageIdx, pageMask := pageIndexOf(span.base())
if arena.pageMarks[pageIdx]&pageMask == 0 {
atomic.Or8(&arena.pageMarks[pageIdx], pageMask)
}
// If this is a noscan object, fast-track it to black
// instead of greying it.
if span.spanclass.noscan() {
gcw.bytesMarked += uint64(span.elemsize)
return
}
}
// Queue the obj for scanning. The PREFETCH(obj) logic has been removed but
// seems like a nice optimization that can be added back in.
// There needs to be time between the PREFETCH and the use.
// Previously we put the obj in an 8 element buffer that is drained at a rate
// to give the PREFETCH time to do its work.
// Use of PREFETCHNTA might be more appropriate than PREFETCH
// 尝试将对象存入 gcwork 的缓存中,或全局队列中,用作后面处理
if !gcw.putFast(obj) {
gcw.put(obj)
}
}
这里有一点要特别说明的,我思考了好久才想明白(菜是真菜),greyobject() 方法名很迷惑,标灰对象?其实 mspan 中使用 gcmarkBits 位图代表是否被垃圾回收扫描的状态,只有黑色和白色,mbits.setMarked() 设置的就是 gcmarkBits 对应的 index 位为 1。灰色是抽象出来的中间状态,没有专门的标灰的逻辑,放入到 gcw 中就是标灰。greyobject() 做的事情就是把自身 位置 标成黑色,代表它存活。最后把当前 位置 保存的 对象 放入到灰色集合,是为了扫描这个对象后续的引用。这里 位置 和 对象 的关系有点绕,需要细品。
尝试存入 gcwork 的缓存中,或全局队列中
func (w *gcWork) putFast(obj uintptr) bool {
w.checkPut(obj, nil)
wbuf := w.wbuf1
if wbuf == nil {
return false
} else if wbuf.nobj == len(wbuf.obj) {
return false
}
// 在尾部添加 注意
wbuf.obj[wbuf.nobj] = obj
wbuf.nobj++
return true
}
// slow path
func (w *gcWork) put(obj uintptr) {
w.checkPut(obj, nil)
flushed := false
wbuf := w.wbuf1
// Record that this may acquire the wbufSpans or heap lock to
// allocate a workbuf.
lockWithRankMayAcquire(&work.wbufSpans.lock, lockRankWbufSpans)
lockWithRankMayAcquire(&mheap_.lock, lockRankMheap)
if wbuf == nil {
w.init()
wbuf = w.wbuf1
// wbuf is empty at this point.
} else if wbuf.nobj == len(wbuf.obj) {
w.wbuf1, w.wbuf2 = w.wbuf2, w.wbuf1
wbuf = w.wbuf1
if wbuf.nobj == len(wbuf.obj) {
putfull(wbuf)
w.flushedWork = true
wbuf = getempty()
w.wbuf1 = wbuf
flushed = true
}
}
// 在尾部添加 注意
wbuf.obj[wbuf.nobj] = obj
wbuf.nobj++
......
}
func putfull(b *workbuf) {
b.checknonempty()
work.full.push(&b.node)
}
无锁队列,for 循环加原子操作实现栈的 push
func (head *lfstack) push(node *lfnode) {
node.pushcnt++
new := lfstackPack(node, node.pushcnt)
if node1 := lfstackUnpack(new); node1 != node {
print("runtime: lfstack.push invalid packing: node=", node, " cnt=", hex(node.pushcnt), " packed=", hex(new), " -> node=", node1, "\n")
throw("lfstack.push")
}
for {
old := atomic.Load64((*uint64)(head))
node.next = old
if atomic.Cas64((*uint64)(head), old, new) {
break
}
}
}
到这里把灰色对象标黑就完成了,又放回灰色集合接着扫下一个指针。