在Go1.14版本开始实现了 基于信号的协程抢占调度
模式,在此版本以前执行以下代码是永远也无法执行完成。
package main
import (
"runtime"
"time"
)
func main() {
runtime.GOMAXPROCS(1)
go func() {
for {
}
}()
time.Sleep(time.Millisecond)
println("OK")
}
原因很简单:在main函数里只有一个CPU,从上到下执行到 time.Sleep()
函数的时候,会将 main goroutine
放出入运行队列,让了P,开始执行匿名函数,但匿名函数是一个for循环,没有任何IO语句,也就是无法引起调度,所以当前仅有的一个P永远被其占用,导致无法打印OK。
这个问题在1.14版本开始有所改变,主要是因为引入了基于信号的抢占模式
。在程序启动时,初始化信号,并在 runtime.sighandler
函数注册了 SIGURG
信号的处理函数 runtime.doSigPreempt
,然后在触发垃圾回收的栈扫描时,调用函数挂起goroutine,并向M发送信号,M收到信号后,会让当前goroutine陷入休眠继续执行其他的goroutine。
本篇从发送与接收信息并处理两方面来看一下它是如何实现的。
发送信号
在上篇文章(认识sysmon监控线程)介绍 sysmon 的时候,我们知道监控线程会在无P的情况下一直运行,定期扫描所有的P,将长时间运行的G 进行解除。
// Always runs without a P, so write barriers are not allowed.
//
//go:nowritebarrierrec
func sysmon() {
......
for {
if idle == 0 { // start with 20us sleep...
delay = 20
} else if idle > 50 { // start doubling the sleep after 1ms...
delay *= 2
}
if delay > 10*1000 { // up to 10ms
delay = 10 * 1000
}
usleep(delay)
......
// retake P's blocked in syscalls
// and preempt long running G's
if retake(now) != 0 {
idle = 0
} else {
idle++
}
}
......
}
通过 retake()
函数对所有 P
进行检查。
// forcePreemptNS is the time slice given to a G before it is
// preempted.
const forcePreemptNS = 10 * 1000 * 1000 // 10ms
func retake(now int64) uint32 {
for i := 0; i < len(allp); i++ {
_p_ := allp[i]
if _p_ == nil {
// This can happen if procresize has grown
// allp but not yet created new Ps.
continue
}
pd := &_p_.sysmontick
s := _p_.status
sysretake := false
if s == _Prunning || s == _Psyscall {
// Preempt G if it's running for too long.
// 如果 P 运行得太久, 则抢占 G
t := int64(_p_.schedtick)
if int64(pd.schedtick) != t {
pd.schedtick = uint32(t)
pd.schedwhen = now
} else if pd.schedwhen+forcePreemptNS <= now {
// 如果超过了10ms就需要进行抢占了
preemptone(_p_)
// In case of syscall, preemptone() doesn't
// work, because there is no M wired to P.
sysretake = true
}
}
......
}
}
如果一个 P
的 pd.schedwhen+forcePreemptNS <= now
,则说明P上的G运行的时间太长,则需要通过函数 preemptone() 进行抢占。
// src/runtime/proc.go
// Tell the goroutine running on processor P to stop.
// This function is purely best-effort. It can incorrectly fail to inform the
// goroutine. It can send inform the wrong goroutine. Even if it informs the
// correct goroutine, that goroutine might ignore the request if it is
// simultaneously executing newstack.
// No lock needs to be held.
// Returns true if preemption request was issued.
// The actual preemption will happen at some point in the future
// and will be indicated by the gp->status no longer being
// Grunning
func preemptone(_p_ *p) bool {
mp := _p_.m.ptr()
if mp == nil || mp == getg().m {
return false
}
// 被抢占的 goroutine
gp := mp.curg
if gp == nil || gp == mp.g0 {
return false
}
// 设置g的抢占标识
gp.preempt = true
// Every call in a go routine checks for stack overflow by
// comparing the current stack pointer to gp->stackguard0.
// Setting gp->stackguard0 to StackPreempt folds
// preemption into the normal stack overflow check.
// 设置栈抢占 stackPreempt,这是一个很大的值比任何栈都大,
// 在 goroutine 内部的每次调用都会比较栈顶指针和 g.stackguard0,用以判断是否发生了栈溢出。
gp.stackguard0 = stackPreempt
// Request an async preemption of this P.
// 对P发一个异步抢占请示
if preemptMSupported && debug.asyncpreemptoff == 0 {
_p_.preempt = true
preemptM(mp)
}
return true
}
这里主要是设备两个抢占标识位,对于信号调用了 preemptM() 函数发送一个抢占请求到m。
// src/runtime/signal_unix.go
const preemptMSupported = true
// preemptM sends a preemption request to mp. This request may be
// handled asynchronously and may be coalesced with other requests to
// the M. When the request is received, if the running G or P are
// marked for preemption and the goroutine is at an asynchronous
// safe-point, it will preempt the goroutine. It always atomically
// increments mp.preemptGen after handling a preemption request.
func preemptM(mp *m) {
......
if atomic.Cas(&mp.signalPending, 0, 1) {
if GOOS == "darwin" || GOOS == "ios" {
atomic.Xadd(&pendingPreemptSignals, 1)
}
// If multiple threads are preempting the same M, it may send many
// signals to the same M such that it hardly make progress, causing
// live-lock problem. Apparently this could happen on darwin. See
// issue #37741.
// Only send a signal if there isn't already one pending.
signalM(mp, sigPreempt)
}
......
}
这里又调用了 signalM()
函数。
// src/runtime/os_darwin.go
func signalM(mp *m, sig int) {
pthread_kill(pthread(mp.procid), uint32(sig))
}
对于后面的 pthread_kill()
函数我们就不再继续看了。
以上就是发送抢占信号的基本流程,相应有也就应该有处理抢占信号的逻辑。
处理信息
给m发送的信息是 sigPreempt ,它是一个常量
const sigPreempt = _SIGURG
对于它的详细说明,可以参考官方注释文档。
程序在开始运行的时候,
// Initialize signals.
// Called by libpreinit so runtime may not be initialized.
//go:nosplit
//go:nowritebarrierrec
func initsig(preinit bool) {
if !preinit {
// It's now OK for signal handlers to run.
signalsOK = true
}
// For c-archive/c-shared this is called by libpreinit with
// preinit == true.
if (isarchive || islibrary) && !preinit {
return
}
for i := uint32(0); i < _NSIG; i++ {
t := &sigtable[i]
if t.flags == 0 || t.flags&_SigDefault != 0 {
continue
}
// We don't need to use atomic operations here because
// there shouldn't be any other goroutines running yet.
fwdSig[i] = getsig(i)
if !sigInstallGoHandler(i) {
// Even if we are not installing a signal handler,
// set SA_ONSTACK if necessary.
if fwdSig[i] != _SIG_DFL && fwdSig[i] != _SIG_IGN {
setsigstack(i)
} else if fwdSig[i] == _SIG_IGN {
sigInitIgnored(i)
}
continue
}
handlingSig[i] = 1
setsig(i, funcPC(sighandler)) // 注册信号对应的回调方法
}
}
go 在启动的时候会把所有的信息都注册一次。
再通过 sighandler() 函数进行注册。
func sighandler(sig uint32, info *siginfo, ctxt unsafe.Pointer, gp *g) {
......
// 如果是抢占信号
if sig == sigPreempt && debug.asyncpreemptoff == 0 {
// Might be a preemption signal.
doSigPreempt(gp, c)
// Even if this was definitely a preemption signal, it
// may have been coalesced with another signal, so we
// still let it through to the application.
}
......
}
然后是执行抢占信号事件
// doSigPreempt handles a preemption signal on gp.
func doSigPreempt(gp *g, ctxt *sigctxt) {
// Check if this G wants to be preempted and is safe to
// preempt.
if wantAsyncPreempt(gp) {
if ok, newpc := isAsyncSafePoint(gp, ctxt.sigpc(), ctxt.sigsp(), ctxt.siglr()); ok {
// Adjust the PC and inject a call to asyncPreempt.
// 执行抢占
ctxt.pushCall(funcPC(asyncPreempt), newpc)
}
}
// Acknowledge the preemption.
atomic.Xadd(&gp.m.preemptGen, 1)
atomic.Store(&gp.m.signalPending, 0)
if GOOS == "darwin" || GOOS == "ios" {
atomic.Xadd(&pendingPreemptSignals, -1)
}
}
会先判断异步安全点 isAsyncSafePoint()
, 然后返回一个可以插入调用 asyncPreempt()
的PC。
本文基于go version 1.16