Golang 基于信号的异步抢占与处理

在Go1.14版本开始实现了 基于信号的协程抢占调度 模式,在此版本以前执行以下代码是永远也无法执行完成。

package main

import (
    "runtime"
    "time"
)

func main() {
    runtime.GOMAXPROCS(1)
    go func() {
        for {
        }
    }()

    time.Sleep(time.Millisecond)
    println("OK")
}

原因很简单:在main函数里只有一个CPU,从上到下执行到 time.Sleep() 函数的时候,会将 main goroutine 放出入运行队列,让了P,开始执行匿名函数,但匿名函数是一个for循环,没有任何IO语句,也就是无法引起调度,所以当前仅有的一个P永远被其占用,导致无法打印OK。

这个问题在1.14版本开始有所改变,主要是因为引入了基于信号的抢占模式。在程序启动时,初始化信号,并在 runtime.sighandler 函数注册了 SIGURG 信号的处理函数 runtime.doSigPreempt,然后在触发垃圾回收的栈扫描时,调用函数挂起goroutine,并向M发送信号,M收到信号后,会让当前goroutine陷入休眠继续执行其他的goroutine。

本篇从发送与接收信息并处理两方面来看一下它是如何实现的。

发送信号

在上篇文章(认识sysmon监控线程)介绍 sysmon 的时候,我们知道监控线程会在无P的情况下一直运行,定期扫描所有的P,将长时间运行的G 进行解除。

// Always runs without a P, so write barriers are not allowed.
//
//go:nowritebarrierrec
func sysmon() {
    ......

    for {
        if idle == 0 { // start with 20us sleep...
            delay = 20
        } else if idle > 50 { // start doubling the sleep after 1ms...
            delay *= 2
        }
        if delay > 10*1000 { // up to 10ms
            delay = 10 * 1000
        }
        usleep(delay)

        ......

        // retake P's blocked in syscalls
        // and preempt long running G's
        if retake(now) != 0 {
            idle = 0
        } else {
            idle++
        }
    }

    ......

}

通过 retake() 函数对所有 P 进行检查。

// forcePreemptNS is the time slice given to a G before it is
// preempted.
const forcePreemptNS = 10 * 1000 * 1000 // 10ms

func retake(now int64) uint32 {
    for i := 0; i < len(allp); i++ {
        _p_ := allp[i]
        if _p_ == nil {
            // This can happen if procresize has grown
            // allp but not yet created new Ps.
            continue
        }
        pd := &_p_.sysmontick
        s := _p_.status
        sysretake := false

        if s == _Prunning || s == _Psyscall {
            // Preempt G if it's running for too long.
            // 如果 P 运行得太久, 则抢占 G
            t := int64(_p_.schedtick)
            if int64(pd.schedtick) != t {
                pd.schedtick = uint32(t)
                pd.schedwhen = now
            } else if pd.schedwhen+forcePreemptNS <= now {

                // 如果超过了10ms就需要进行抢占了
                preemptone(_p_)

                // In case of syscall, preemptone() doesn't
                // work, because there is no M wired to P.
                sysretake = true
            }
        }

        ......
    }

}

如果一个 Ppd.schedwhen+forcePreemptNS <= now ,则说明P上的G运行的时间太长,则需要通过函数 preemptone() 进行抢占。

// src/runtime/proc.go

// Tell the goroutine running on processor P to stop.
// This function is purely best-effort. It can incorrectly fail to inform the
// goroutine. It can send inform the wrong goroutine. Even if it informs the
// correct goroutine, that goroutine might ignore the request if it is
// simultaneously executing newstack.
// No lock needs to be held.
// Returns true if preemption request was issued.
// The actual preemption will happen at some point in the future
// and will be indicated by the gp->status no longer being
// Grunning
func preemptone(_p_ *p) bool {
    mp := _p_.m.ptr()
    if mp == nil || mp == getg().m {
        return false
    }

    // 被抢占的 goroutine
    gp := mp.curg
    if gp == nil || gp == mp.g0 {
        return false
    }

    // 设置g的抢占标识
    gp.preempt = true

    // Every call in a go routine checks for stack overflow by
    // comparing the current stack pointer to gp->stackguard0.
    // Setting gp->stackguard0 to StackPreempt folds
    // preemption into the normal stack overflow check.
    // 设置栈抢占 stackPreempt,这是一个很大的值比任何栈都大,
    // 在 goroutine 内部的每次调用都会比较栈顶指针和 g.stackguard0,用以判断是否发生了栈溢出。
    gp.stackguard0 = stackPreempt

    // Request an async preemption of this P.
    // 对P发一个异步抢占请示
    if preemptMSupported && debug.asyncpreemptoff == 0 {
        _p_.preempt = true
        preemptM(mp)
    }

    return true
}

这里主要是设备两个抢占标识位,对于信号调用了 preemptM() 函数发送一个抢占请求到m。

// src/runtime/signal_unix.go

const preemptMSupported = true

// preemptM sends a preemption request to mp. This request may be
// handled asynchronously and may be coalesced with other requests to
// the M. When the request is received, if the running G or P are
// marked for preemption and the goroutine is at an asynchronous
// safe-point, it will preempt the goroutine. It always atomically
// increments mp.preemptGen after handling a preemption request.
func preemptM(mp *m) {
    ......
    if atomic.Cas(&mp.signalPending, 0, 1) {
        if GOOS == "darwin" || GOOS == "ios" {
            atomic.Xadd(&pendingPreemptSignals, 1)
        }

        // If multiple threads are preempting the same M, it may send many
        // signals to the same M such that it hardly make progress, causing
        // live-lock problem. Apparently this could happen on darwin. See
        // issue #37741.
        // Only send a signal if there isn't already one pending.
        signalM(mp, sigPreempt)
    }
    ......
}

这里又调用了 signalM() 函数。

// src/runtime/os_darwin.go

func signalM(mp *m, sig int) {
    pthread_kill(pthread(mp.procid), uint32(sig))
}

对于后面的 pthread_kill() 函数我们就不再继续看了。

以上就是发送抢占信号的基本流程,相应有也就应该有处理抢占信号的逻辑。

处理信息

给m发送的信息是 sigPreempt ,它是一个常量

const sigPreempt = _SIGURG

对于它的详细说明,可以参考官方注释文档。

程序在开始运行的时候,

// Initialize signals.
// Called by libpreinit so runtime may not be initialized.
//go:nosplit
//go:nowritebarrierrec
func initsig(preinit bool) {
    if !preinit {
        // It's now OK for signal handlers to run.
        signalsOK = true
    }

    // For c-archive/c-shared this is called by libpreinit with
    // preinit == true.
    if (isarchive || islibrary) && !preinit {
        return
    }

    for i := uint32(0); i < _NSIG; i++ {
        t := &sigtable[i]
        if t.flags == 0 || t.flags&_SigDefault != 0 {
            continue
        }

        // We don't need to use atomic operations here because
        // there shouldn't be any other goroutines running yet.
        fwdSig[i] = getsig(i)

        if !sigInstallGoHandler(i) {
            // Even if we are not installing a signal handler,
            // set SA_ONSTACK if necessary.
            if fwdSig[i] != _SIG_DFL && fwdSig[i] != _SIG_IGN {
                setsigstack(i)
            } else if fwdSig[i] == _SIG_IGN {
                sigInitIgnored(i)
            }
            continue
        }

        handlingSig[i] = 1
        setsig(i, funcPC(sighandler)) // 注册信号对应的回调方法
    }
}

go 在启动的时候会把所有的信息都注册一次。

再通过 sighandler() 函数进行注册。

func sighandler(sig uint32, info *siginfo, ctxt unsafe.Pointer, gp *g) {
    ......
    // 如果是抢占信号
    if sig == sigPreempt && debug.asyncpreemptoff == 0 {
        // Might be a preemption signal.
        doSigPreempt(gp, c)
        // Even if this was definitely a preemption signal, it
        // may have been coalesced with another signal, so we
        // still let it through to the application.
    }
    ......
}

然后是执行抢占信号事件

// doSigPreempt handles a preemption signal on gp.
func doSigPreempt(gp *g, ctxt *sigctxt) {
    // Check if this G wants to be preempted and is safe to
    // preempt.
    if wantAsyncPreempt(gp) {
        if ok, newpc := isAsyncSafePoint(gp, ctxt.sigpc(), ctxt.sigsp(), ctxt.siglr()); ok {
            // Adjust the PC and inject a call to asyncPreempt.

            // 执行抢占
            ctxt.pushCall(funcPC(asyncPreempt), newpc)
        }
    }

    // Acknowledge the preemption.
    atomic.Xadd(&gp.m.preemptGen, 1)
    atomic.Store(&gp.m.signalPending, 0)

    if GOOS == "darwin" || GOOS == "ios" {
        atomic.Xadd(&pendingPreemptSignals, -1)
    }
}

会先判断异步安全点 isAsyncSafePoint(), 然后返回一个可以插入调用 asyncPreempt() 的PC。

本文基于go version 1.16

参考资料

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,142评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,298评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,068评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,081评论 1 291
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,099评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,071评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,990评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,832评论 0 273
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,274评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,488评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,649评论 1 347
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,378评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,979评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,625评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,796评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,643评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,545评论 2 352

推荐阅读更多精彩内容