[TOC]
go 协程执行过程
1. 生成g,并放入队列
- 用户调用go时,编译器会会调用runtime.newproc函数
// go 一个协程,就会调用此函数, 将其放到g的 queue中
// siz 为参数bytes
// fn 则包含了参数及函数相关信息
func newproc(siz int32, fn *funcval) {
argp := add(unsafe.Pointer(&fn), sys.PtrSize)
gp := getg()
pc := getcallerpc()
systemstack(func() {
newproc1(fn, argp, siz, gp, pc)
})
}
- 其中 systemstack 会在system stack 上执行 传入的函数
func systemstack(fn func())
如果是g0(per-OS-thread) 或者是signal handling (gsignal)调用systemstack,则会直接执行fn并返回
如果是一个普通的goroutine调用,则会先切换到p0 stack,调用fn,然后再切回
- newproc1 实际创建一个新的g来执行具体函数
// 创建一个新的g
// argp 为参数起点地址
// narg为参数长度
// callerpc为创建这个go的pc
// callergp为创建这个go的g
func newproc1(fn *funcval, argp unsafe.Pointer, narg int32, callergp *g, callerpc uintptr) {
...
_p_ := _g_.m.p.ptr()
// 从gfree list中获取一个空闲g,如果local gfree list没有,则从 global list中获取
newg := gfget(_p_)
// 如果没有空闲g则新分配一个
if newg == nil {
// 创建一个stack size最小的g
newg = malg(_StackMin)
// 设置g的状态为_Gdead(g退出或者新建时为此状态)
// 并且这个函数会在 g->atomicstatus is in a Gscan status时循环等待,直到Gscan state is finished
casgstatus(newg, _Gidle, _Gdead)
// 添加到allgs列表中
allgadd(newg) // publishes with a g->status of Gdead so GC scanner doesn't look at uninitialized stack.
....
// 初始化newg相关变量
newg.sched.sp = sp
newg.stktopsp = sp
newg.sched.pc = funcPC(goexit) + sys.PCQuantum // +PCQuantum so that previous instruction is in same function
newg.sched.g = guintptr(unsafe.Pointer(newg))
gostartcallfn(&newg.sched, fn)
newg.gopc = callerpc
newg.ancestors = saveAncestors(callergp)
newg.startpc = fn.fn
if _g_.m.curg != nil {
newg.labels = _g_.m.curg.labels
}
if isSystemGoroutine(newg, false) {
atomic.Xadd(&sched.ngsys, +1)
}
newg.gcscanvalid = false
// 修改newg为 _Grunnable状态
casgstatus(newg, _Gdead, _Grunnable)
....
// runqput(_p_ *p, gp *g, next bool)
// 尝试将newg放到当前的local runnable queue
// 如果next为true,则将g放到 _p_.runnext(下次调度)
// 如果next为false,则放在队列尾部
// 如果queue满的话,则放在全局的queue中
runqput(_p_, newg, true)
if atomic.Load(&sched.npidle) != 0 && atomic.Load(&sched.nmspinning) == 0 && mainStarted {
// 尝试找M执行P的g队列,如果必要的话会创建M
// 如果p==nil,尝试获取一个空闲的p,如果没有则什么也不做
wakep()
}
releasem(_g_.m)
}
- wakep 在g变为 runnable时调用,尝试添加一个P执行g
func wakep() {
// be conservative about spinning threads
if !atomic.Cas(&sched.nmspinning, 0, 1) {
return
}
startm(nil, true)
}
其中 startm如下
// 调度M执行p(创建一个M如果需要的话)
// 如果p为nil,尝试获取一个idle的p,否则啥也不干
func startm(_p_ *p, spinning bool) {
if _p_ == nil {
//尝试获取一个idle的p
_p_ = pidleget()
if _p_ == nil {
unlock(&sched.lock)
if spinning {
// The caller incremented nmspinning, but there are no idle Ps,
// so it's okay to just undo the increment and give up.
if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {
throw("startm: negative nmspinning")
}
}
return
}
}
// 尝试从midle list中获取一个m
mp := mget()
if mp == nil {
.....
// 创建一个新m,并且调用fn或者scheduler
newm(fn, _p_)
.....
}
......
// 标记m状态
notewakeup(&mp.park)
}
2. 调度
go协程的调度由m来处理
M启动时会调用mstart函数, m0在初始化后调用, 其他的的m在线程启动后调用.
2.1 mstart函数的处理如下:
- 调用getg获取当前的g, 这里会获取到g0
- 如果g未分配栈则从当前的栈空间(系统栈空间)上分配, 也就是说g0会使用系统栈空间
- 调用mstart1函数
mstart 函数如下:
func mstart() {
// 获取当前g
_g_ := getg()
//如果g未分配栈则从当前的栈空间(系统栈空间)上分配, 也就是说g0会使用系统栈空间
osStack := _g_.stack.lo == 0
if osStack {
// Initialize stack bounds from system stack.
// Cgo may have left stack size in stack.hi.
// minit may update the stack bounds.
size := _g_.stack.hi
if size == 0 {
size = 8192 * sys.StackGuardMultiplier
}
_g_.stack.hi = uintptr(noescape(unsafe.Pointer(&size)))
_g_.stack.lo = _g_.stack.hi - size + 1024
}
......
// 调用mstart1
mstart1()
......
}
mstart1 函数如下:
func mstart1() {
_g_ := getg()
....
//因为schedule会一直循环
// 所以要提前在mcall的stack顶中保留当前的pc和sp (stack pointer (SP))
//方便起它的调用可以reuse the current frame
save(getcallerpc(), getcallersp())
asminit()
minit()
// Install signal handlers; after minit so that minit can
// prepare the thread to be able to handle the signals.
if _g_.m == &m0 {
mstartm0()
}
if fn := _g_.m.mstartfn; fn != nil {
fn()
}
if _g_.m != &m0 {
acquirep(_g_.m.nextp.ptr())
_g_.m.nextp = 0
}
// 开始调度
schedule()
}
2.2 调用schedule函数后就进入了调度
schedule函数获取g => [必要时休眠] => [唤醒后继续获取] => execute函数执行g => 执行后返回到goexit => 重新执行schedule函数
- schedule
// 找到一个runnable goroutine并执行
func schedule() {
_g_ := getg()
.....
if _g_.m.lockedg != 0 {
// 停止当前m的执行,直到g重新runnable
stoplockedm()
// func execute(gp *g, inheritTime bool)
// 调度gp在当前的M执行
// 如果inheritTime为true,则继承剩下的时间
// 否则重新开启一个time
// 用远不会返回
// 允许Write barriers
execute(_g_.m.lockedg.ptr(), false) // Never returns.
}
.....
top:
// 如果当前GC需要停止整个世界(STW), 则调用[stopm](https://github.com/golang/go/blob/go1.13.5/src/runtime/proc.go#L2103)休眠当前的M
if sched.gcwaiting != 0 {
gcstopm()
goto top
}
// 如果M拥有的P中指定了需要在安全点运行的函数(P.runSafePointFn), 则运行它
if _g_.m.p.ptr().runSafePointFn != 0 {
runSafePointFn()
}
...
// 检测wakeP是否准备OK
tryWakeP := false
if trace.enabled || trace.shutdown {
gp = traceReader()
if gp != nil {
//切换为 _Grunnable 状态
casgstatus(gp, _Gwaiting, _Grunnable)
traceGoUnpark(gp, 0)
tryWakeP = true
}
}
...
if gp == nil {
// Check the global runnable queue once in a while to ensure fairness.
// Otherwise two goroutines can completely occupy the local runqueue
// by constantly respawning each other.
// 每调度61次会检测运行下global runnable queue
if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 {
lock(&sched.lock)
gp = globrunqget(_g_.m.p.ptr(), 1)
unlock(&sched.lock)
}
}
if gp == nil {
//从本地运行队列获取一个可以运行的g
gp, inheritTime = runqget(_g_.m.p.ptr())
if gp != nil && _g_.m.spinning {
throw("schedule: spinning with local work")
}
}
if gp == nil {
// 如果本地没有可运行的g,则尝试从其他的P中获取(会直接取一半),或者从 global queue中获取,或者poll network,都没有一直尝试获取
gp, inheritTime = findrunnable() // blocks until work is available
}
// This thread is going to run a goroutine and is not spinning anymore,
// so if it was marked as spinning we need to reset it now and potentially
// start a new spinning M.
//如果当前m标记为自旋,则reset
if _g_.m.spinning {
//如果当前有空闲的P, 但是无自旋的M(nmspinning等于0), 则唤醒或新建一个M
//这里离开自选状态是为了执行G, 所以会检查是否有空闲的P, 有则表示可以再开新的M执行G
resetspinning()
}
....
//如果G要求回到指定的M(例如上面的runtime.main)
if gp.lockedm != 0 {
// Hands off own p to the locked m,
// then blocks waiting for a new p.
// 调用[startlockedm](https://github.com/golang/go/blob/go1.13.5/src/runtime/proc.go#L2083)函数把G和P交给该M, 自己进入休眠
startlockedm(gp)
// 从休眠唤醒后跳到schedule的顶部重试
goto top
}
// 执行G
execute(gp, inheritTime)
}
2.3 调用execute函数 执行具体操作
execute 调度gp在当前的M上运行
如果inheritTime为true,则使用剩余时间,否则就开启一个新的时间片
func execute(gp *g, inheritTime bool) {
_g_ := getg()
// 修改g状态为_Grunning
casgstatus(gp, _Grunnable, _Grunning)
....
// 每次调度增加一
if !inheritTime {
_g_.m.p.ptr().schedtick++
}
.....
//这个函数会根据g.sched中保存的状态恢复各个寄存器的值并继续运行g
gogo(&gp.sched)
}
目标函数执行完毕后会调用goexit函数, goexit函数会调用goexit1函数, goexit1函数会通过mcall
mcall这个函数就是用于实现"保存状态"的, 处理如下:
- 设置g.sched.pc等于当前的返回地址
- 设置g.sched.sp等于寄存器rsp的值
- 设置g.sched.g等于当前的g
- 设置g.sched.bp等于寄存器rbp的值
- 切换TLS中当前的g等于m.g0
- 设置寄存器rsp等于g0.sched.sp, 使用g0的栈空间
- 设置第一个参数为原来的g
- 设置rdx寄存器为指向函数地址的指针(上下文)
- 调用指定的函数, 不会返回
goexit1函数会通过mcall调用goexit0函数, goexit0函数调用时已经回到了g0的栈空间, 处理如下:
- 把G的状态由运行中(_Grunning)改为已中止(_Gdead)
- 清空G的成员
- 调用dropg函数解除M和G之间的关联
- 调用gfput函数把G放到P的自由列表中, 下次创建G时可以复用
- 调用schedule函数继续调度
G结束后回到schedule函数, 这样就结束了一个调度循环.
不仅只有G结束会重新开始调度, G被抢占或者等待资源也会重新进行调度
2.4 抢占
runtime.main会创建一个额外的M运行sysmon函数, 抢占就是在sysmon中实现的.
sysmon会进入一个无限循环, 第一轮回休眠20us, 之后每次休眠时间倍增, 最终每一轮都会休眠10ms.
sysmon中有netpool(获取fd事件), retake(抢占), forcegc(按时间强制执行gc), scavenge heap(释放自由列表中多余的项减少内存占用)等处理.
func sysmon() {
....
for {
if idle == 0 { // start with 20us sleep...
delay = 20
} else if idle > 50 { // start doubling the sleep after 1ms...
delay *= 2
}
if delay > 10*1000 { // up to 10ms
delay = 10 * 1000
}
usleep(delay)
...
// poll network if not polled for more than 10ms
lastpoll := int64(atomic.Load64(&sched.lastpoll))
// 如果超过10ms没有poll则处理下
if netpollinited() && lastpoll != 0 && lastpoll+10*1000*1000 < now {
atomic.Cas64(&sched.lastpoll, uint64(lastpoll), uint64(now))
//获取等待poll后runnable的g 列表
list := netpoll(false) // non-blocking - returns list of goroutines
if !list.empty() {
// Need to decrement number of idle locked M's
// (pretending that one more is running) before injectglist.
// Otherwise it can lead to the following situation:
// injectglist grabs all P's but before it starts M's to run the P's,
// another M returns from syscall, finishes running its G,
// observes that there is no work to do and no other running M's
// and reports deadlock.
incidlelocked(-1)
injectglist(&list)
incidlelocked(1)
}
}
// retake P's blocked in syscalls
// and preempt long running G's
//[retake](https://github.com/golang/go/blob/master/src/runtime/proc.go#L4388)函数负责处理抢占
if retake(now) != 0 {
idle = 0
} else {
idle++
}
// check if we need to force a GC
// 检查是否需要强制GC
if t := (gcTrigger{kind: gcTriggerTime, now: now}); t.test() && atomic.Load(&forcegc.idle) != 0 {
lock(&forcegc.lock)
forcegc.idle = 0
var list gList
list.push(forcegc.g)
injectglist(&list)
unlock(&forcegc.lock)
}
if debug.schedtrace > 0 && lasttrace+int64(debug.schedtrace)*1000000 <= now {
lasttrace = now
schedtrace(debug.scheddetail > 0)
}
}
}
- 其中抢占流程主要由retake完成
func retake(now int64) uint32 {
lock(&allpLock)
// 遍历所有p
for i := 0; i < len(allp); i++ {
_p_ := allp[i]
....
pd := &_p_.sysmontick
s := _p_.status
sysretake := false
if s == _Prunning || s == _Psyscall {
// Preempt G if it's running for too long.
// 如果G运行太长时间,则Preempt
t := int64(_p_.schedtick)
if int64(pd.schedtick) != t {
pd.schedtick = uint32(t)
pd.schedwhen = now
} else if pd.schedwhen+forcePreemptNS <= now { // 如果超过了forcePreemptNS (10ms),则抢占
// 抢占
preemptone(_p_)
// In case of syscall, preemptone() doesn't
// work, because there is no M wired to P.
sysretake = true
}
}
if s == _Psyscall {
....
// 如果P在系统调用中(_Psyscall), 且经过了一次sysmon循环(20us~10ms), 则抢占这个P
// 调用[handoffp](https://github.com/golang/go/blob/master/src/runtime/proc.go#L1990)解除M和P之间的关联
handoffp(_p_)
}
}
unlock(&allpLock)
return uint32(n))
}
- 通过preemptone来具体实现抢占
// 让在p上运行的g停止
func preemptone(_p_ *p) bool {
mp := _p_.m.ptr()
if mp == nil || mp == getg().m {
return false
}
gp := mp.curg
if gp == nil || gp == mp.g0 {
return false
}
gp.preempt = true
// Every call in a go routine checks for stack overflow by
// comparing the current stack pointer to gp->stackguard0.
// Setting gp->stackguard0 to StackPreempt folds
// preemption into the normal stack overflow check.
// go中每次call都会检测 stack overflow,通过设置gp.stackguard0 = stackPreempt(stackPreempt超过任何的real sp),检测就会意识到这是被抢占了
gp.stackguard0 = stackPreempt
return true
}
参考
Golang源码探索(二) 协程的实现原理
https://github.com/golang/go/tree/go1.13.5