以下文章均为拜读公众号 源码游记 的笔记 http://mp.weixin.qq.com/mp/homepage?__biz=MzU1OTg5NDkzOA==&hid=1&sn=8fc2b63f53559bc0cee292ce629c4788&scene=18#wechat_redirect
3. Goroutine调度策略
前面提过,所谓的goroutine调度,是指程序代码按照一定的算法找到合适的g
并放到CPU上去运行的过程。
所以有以下三个问题需要思考解决
- 调度时机:什么时候发生调度
- 调度策略:选取哪个
g
去调度 - 切换机制:怎么把
g
放到CPU中(这就是我们说的gogo
)
下面我们主要未然调度策略展开,看下我们是怎么选择g
的。
1. schedule的操作
之前我们看到schedule
在调度循环中将一个g
放到CPU上去运行,但是在之前它还负责找到一个合适的g
去调度。
这次我们主要关注那些调度算法
// One round of scheduler: find a runnable goroutine and execute it.
// Never returns.
func schedule() {
_g_ := getg() //_g_ = m.g0
......
var gp *g
......
if gp == nil {
// Check the global runnable queue once in a while to ensure fairness.
// Otherwise two goroutines can completely occupy the local runqueue
// by constantly respawning each other.
//为了保证调度的公平性,每个工作线程每进行61次调度就需要优先从全局运行队列中获取goroutine出来运行,
//因为如果只调度本地运行队列中的goroutine,则全局运行队列中的goroutine有可能得不到运行
if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 {
lock(&sched.lock) //所有工作线程都能访问全局运行队列,所以需要加锁
gp = globrunqget(_g_.m.p.ptr(), 1) //从全局运行队列中获取1个goroutine
unlock(&sched.lock)
}
}
if gp == nil {
//从与m关联的p的本地运行队列中获取goroutine
gp, inheritTime = runqget(_g_.m.p.ptr())
if gp != nil && _g_.m.spinning {
throw("schedule: spinning with local work")
}
}
if gp == nil {
//如果从本地运行队列和全局运行队列都没有找到需要运行的goroutine,
//则调用findrunnable函数从其它工作线程的运行队列中偷取,如果偷取不到,则当前工作线程进入睡眠,
//直到获取到需要运行的goroutine之后findrunnable函数才会返回。
gp, inheritTime = findrunnable() // blocks until work is available
}
......
//当前运行的是runtime的代码,函数调用栈使用的是g0的栈空间
//调用execte切换到gp的代码和栈空间去运行
execute(gp, inheritTime)
}
由代码可见,schedule
函数大概分为了三步去寻找可运行的g
三步查找
第一步
从全局队列中寻找g
。为了保证调度的公平性,每个工作线程每经过61次调度就需要优先尝试从全局队列中找出一个g
来运行,这样保证每个g
都有机会得到调用。
之前提过曲剧队列是所有工作线程都可以访问的,所以在访问它之前需要加锁。
第二步
从本地运行队列寻找g
,这个是常态,基本上都是从这里获取。这个的前提是在上一步没有取到g
第三步
从其他工作线程的运行队列中偷取g。
下面我们详细看下这三步
2.全局运行队列获取goroutine
下面我们来看下获取g的函数globrunqget
// Try get a batch of G's from the global runnable queue.
// Sched must be locked.
func globrunqget(_p_ *p, max int32) *g {
if sched.runqsize == 0 { // 全局运行队列为空,前面已经加锁了
return nil
}
// 以下为一些数量限制
n := sched.runqsize/gomaxprocs + 1 // 按照p数量均分
if n > sched.runqsize {
n = sched.runqsize
}
if max > 0 && n > max { // 不超过最大值
n = max
}
if n > int32(len(_p_.runq))/2 { // 不超过一半,如果runq为1个是不是永远取不走???
n = int32(len(_p_.runq)) / 2
}
sched.runqsize -= n // 更新个数
gp := sched.runq.pop() // 至少取走一个,
n--
for ; n > 0; n-- { // 循环取走,并放入本地运行队列
gp1 := sched.runq.pop() // 取
runqput(_p_, gp1, false) // 放
}
return gp // 把第一个return ,用做运行
}
该函数的第一个参数当前工作线程绑定的p
,第二个参数是最多可以取多少个。
并且如果多于1个,那么其他取出放入本地运行队列。
3. 本地运行队列中获取goroutine
函数runqget
// Get g from local runnable queue.
// If inheritTime is true, gp should inherit the remaining time in the
// current time slice. Otherwise, it should start a new time slice.
// Executed only by the owner P.
func runqget(_p_ *p) (gp *g, inheritTime bool) {
// If there's a runnext, it's the next G to run.
for {
next := _p_.runnext // runnext 高优插队者,如果有的话 优先runnext TODO 查询runnext在什么条件下出现
if next == 0 {
break
}
if _p_.runnext.cas(next, 0) {
return next.ptr(), true
}
}
for {
// 获取首尾
h := atomic.LoadAcq(&_p_.runqhead) // load-acquire, synchronize with other consumers
t := _p_.runqtail
if t == h {
return nil, false
}
gp := _p_.runq[h%uint32(len(_p_.runq))].ptr()
// 原子+1
if atomic.CasRel(&_p_.runqhead, h, h+1) { // cas-release, commits consume
return gp, false
}
}
}
这里需要注意的是取gp时候是加锁的。
因为有其他的工作线程有可能也在访问这2个成员,比如有人正在偷你的
g
对于语义
<img src="http://picgo.vipkk.work/20200523210047.png" alt="image-20200523210047055" style="zoom:50%;" />
CAS操作和ABA问题
CAS 操作会造成[ABA问题][ABA]
那么在runqget中会不会呢,答案是不会的,分析如下
对于runnext
- 只有runqget才会将其改为0,其他地方都只是将他改为非0值
- 任何时刻只有一个当前工作线程可以执行runqget,不会并发,也就不存在其他人将其改为0
对于runq
- 只有当前线程才会去往自己的队列添加g,知会去偷g,从而修改runqhead
- 所以,当我们这个工作线程从runqhead读取到值A之后,其它工作线程也就不可能修改runqhead的值为B之后再第二次把它修改为值A(因为runqtail在这段时间之内不可能被修改,runqhead的值也就无法越过runqtail再回绕到A值),也就是说,代码从逻辑上已经杜绝了引发ABA的条件。
4. 从其他线程偷取G
//如果从本地运行队列和全局运行队列都没有找到需要运行的goroutine, //则调用findrunnable函数从其它工作线程的运行队列中偷取,如果偷取不到,则当前工作线程进入睡眠, //直到获取到需要运行的goroutine之后findrunnable函数才会返回。 gp, inheritTime = findrunnable() // blocks until work is available
findrunnable
函数负责处理与盗取相关的逻辑,另外该函数还负责gc和netpoll等相关的事情。目前我们先关注偷取算法。因为代码太长我们分步骤来看
// Finds a runnable goroutine to execute.
// Tries to steal from other P's, get g from global queue, poll network.
func findrunnable() (gp *g, inheritTime bool) {
}
在偷取之前再看下本地和全局有没有可运行的队列,有的话取出返回收工
// local runq 检查本地队列是否有可以运行的
if gp, inheritTime := runqget(_p_); gp != nil {
return gp, inheritTime
}
// global runq 检查全局队列是否有可以运行的
if sched.runqsize != 0 {
lock(&sched.lock)
gp := globrunqget(_p_, 0)
unlock(&sched.lock)
if gp != nil {
return gp, false
}
}
接下来是一些netpoll相关,跳过。
再然后是fastpath,快速失败
需要注意fastpath的2个点
- m 没有在自旋
- 自旋的个数 大于 work的个数。
2*atomic.Load(&sched.nmspinning) >= procs-atomic.Load(&sched.npidle)
// Steal work from other P's.
procs := uint32(gomaxprocs)
ranTimer := false
// If number of spinning M's >= number of busy P's, block.
// This is necessary to prevent excessive CPU consumption
// when GOMAXPROCS>>1 but the program parallelism is low.
// 这里的意思是 m还没有开始寻找,但是已经有超多一半的m在寻找了,那么它就不找了,这样省CPU。
if !_g_.m.spinning && 2*atomic.Load(&sched.nmspinning) >= procs-atomic.Load(&sched.npidle) {
goto stop
}
if !_g_.m.spinning {
_g_.m.spinning = true
atomic.Xadd(&sched.nmspinning, 1)
}
接着开始偷取,一共进行4次尝试。如果成功,则偷取一半并返回。当进行第3、4次尝试的时候,也会尝试偷取runnext。
这里的偷取为了公平是随机选p的。不论位置还是下一个都是随机的详见[随机偷取算法](# 随机偷取算法)
for i := 0; i < 4; i++ {
// 随机开始,随机下一个
for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() {
if sched.gcwaiting != 0 {
goto top
}
stealRunNextG := i > 2 // first look for ready queues with more than 1 g
p2 := allp[enum.position()]
if _p_ == p2 {
continue
}
// 如果成功取出一半
if gp := runqsteal(_p_, p2, stealRunNextG); gp != nil {
return gp, false
}
}
}
当找到的时候,通过runqsteal
来偷取,
这个逻辑比较简单,grab之后获取到根据大小来跟新 runqhead & runqtail
// Steal half of elements from local runnable queue of p2
// and put onto local runnable queue of p.
// Returns one of the stolen elements (or nil if failed).
func runqsteal(_p_, p2 *p, stealRunNextG bool) *g {
t := _p_.runqtail
n := runqgrab(p2, &_p_.runq, t, stealRunNextG)
if n == 0 {
return nil
}
n--
gp := _p_.runq[(t+n)%uint32(len(_p_.runq))].ptr()
if n == 0 {
return gp
}
h := atomic.LoadAcq(&_p_.runqhead) // load-acquire, synchronize with consumers
if t-h+n >= uint32(len(_p_.runq)) {
throw("runqsteal: runq overflow")
}
atomic.StoreRel(&_p_.runqtail, t+n) // store-release, makes the item available for consumption
return gp
}
接下来就是runqgrad
这里负责了真正的偷取
grad 根据_p_
的大小判断偷取的一半,然后放进batch里,再更新head&tail。 至此整个偷取算法完成。
// Grabs a batch of goroutines from _p_'s runnable queue into batch.
// Batch is a ring buffer starting at batchHead.
// Returns number of grabbed goroutines.
// Can be executed by any P.
func runqgrab(_p_ *p, batch *[256]guintptr, batchHead uint32, stealRunNextG bool) uint32 {
for {
h := atomic.LoadAcq(&_p_.runqhead) // load-acquire, synchronize with other consumers
t := atomic.LoadAcq(&_p_.runqtail) // load-acquire, synchronize with the producer
n := t - h
n = n - n/2
if n == 0 {
...
return 0
}
if n > uint32(len(_p_.runq)/2) { // read inconsistent h and t
continue
}
for i := uint32(0); i < n; i++ {
g := _p_.runq[(h+i)%uint32(len(_p_.runq))]
batch[(batchHead+i)%uint32(len(batch))] = g
}
if atomic.CasRel(&_p_.runqhead, h, h+n) { // cas-release, commits consume
return n
}
}
}
再回到findrunnable,如果四次都没有成功则进入stop
模式(ps 忽略runtime,暂时没看懂)
在这里首先将m与p解除绑定,然后准备去休眠,然后把p放到空闲队列,不过在结束之前进行了最后一次判断。
最后stopm
stop:
......
// Before we drop our P, make a snapshot of the allp slice,
// which can change underfoot once we no longer block
// safe-points. We don't need to snapshot the contents because
// everything up to cap(allp) is immutable.
allpSnapshot := allp
// return P and block
lock(&sched.lock)
......
if sched.runqsize != 0 {
gp := globrunqget(_p_, 0)
unlock(&sched.lock)
return gp, false
}
// 当前工作线程解除与p之间的绑定,准备去休眠
if releasep() != _p_ {
throw("findrunnable: wrong p")
}
//把p放入空闲队列
pidleput(_p_)
unlock(&sched.lock)
// Delicate dance: thread transitions from spinning to non-spinning state,
// potentially concurrently with submission of new goroutines. We must
// drop nmspinning first and then check all per-P queues again (with
// #StoreLoad memory barrier in between). If we do it the other way around,
// another thread can submit a goroutine after we've checked all run queues
// but before we drop nmspinning; as the result nobody will unpark a thread
// to run the goroutine.
// If we discover new work below, we need to restore m.spinning as a signal
// for resetspinning to unpark a new worker thread (because there can be more
// than one starving goroutine). However, if after discovering new work
// we also observe no idle Ps, it is OK to just park the current thread:
// the system is fully loaded so no spinning threads are required.
// Also see "Worker thread parking/unparking" comment at the top of the file.
wasSpinning := _g_.m.spinning
if _g_.m.spinning {
//m即将睡眠,状态不再是spinning
_g_.m.spinning = false
if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {
throw("findrunnable: negative nmspinning")
}
}
// check all runqueues once again
// 休眠之前再看一下是否有工作要做
for _, _p_ := range allpSnapshot {
if !runqempty(_p_) {
lock(&sched.lock)
_p_ = pidleget()
unlock(&sched.lock)
if _p_ != nil {
acquirep(_p_)
if wasSpinning {
_g_.m.spinning = true
atomic.Xadd(&sched.nmspinning, 1)
}
goto top
}
break
}
}
......
//休眠
stopm()
goto top
}
最后万般无奈进入stopm
// Stops execution of the current m until new work is available.
// Returns with acquired P.
func stopm() {
_g_ := getg()
if _g_.m.locks != 0 {
throw("stopm holding locks")
}
if _g_.m.p != 0 {
throw("stopm holding p")
}
if _g_.m.spinning {
throw("stopm spinning")
}
lock(&sched.lock)
mput(_g_.m) //把m结构体对象放入sched.midle空闲队列
unlock(&sched.lock)
notesleep(&_g_.m.park) //进入睡眠状态
//被其它工作线程唤醒
noteclear(&_g_.m.park)
acquirep(_g_.m.nextp.ptr())
_g_.m.nextp = 0
}
stopm的主要操作就是将m放到 midle队列里,然后notesleep 进入睡眠状态。note的是go的一次性睡眠和唤醒机制,具体参考note细节。
继续当notesleep返回后,继续绑定p 然后开始新的循环调度。
至于notesleep,它是通过futex完成的sleep。
从前面的代码我们已经看到,stopm调用notesleep时给它传递的参数是m结构体的park成员,而m又早已通过mput放入了全局的milde空闲队列,这样其它运行着的线程一旦发现有更多的goroutine需要运行时就可以通过全局的m空闲队列找到处于睡眠状态的m,然后调用notewakeup(&m.park)将其唤醒,至于怎么唤醒,我们在其它章节继续讨论.
补充
随机偷取算法
type randomEnum struct {
i uint32
count uint32
pos uint32
inc uint32
}
func (ord *randomOrder) reset(count uint32) {
ord.count = count
ord.coprimes = ord.coprimes[:0]
for i := uint32(1); i <= count; i++ {
if gcd(i, count) == 1 {
ord.coprimes = append(ord.coprimes, i) // 找比count小的质数作为incr
}
}
}
func (ord *randomOrder) start(i uint32) randomEnum {
return randomEnum{
count: ord.count,
pos: i % ord.count,
inc: ord.coprimes[i%uint32(len(ord.coprimes))], // 随机选一个质数
}
}
func (enum *randomEnum) done() bool {
return enum.i == enum.count
}
func (enum *randomEnum) next() {
enum.i++
enum.pos = (enum.pos + enum.inc) % enum.count // 随机下一个
}
下面举例说明一下上述算法过程,现假设nprocs为8,也就是一共有8个p。
如果第一次随机选择的offset = 6,coprime = 3(3与8互质,满足算法要求)的话,则从allp切片中偷取的下标顺序为6, 1, 4, 7, 2, 5, 0, 3,计算过程:
6,(6+3)%8=1,(1+3)%8=4, (4+3)%8=7, (7+3)%8=2, (2+3)%8=5, (5+3)%8=0, (0+3)%8=3
如果第二次随机选择的offset = 4,coprime = 5的话,则从allp切片中偷取的下标顺序为1, 6, 3, 0, 5, 2, 7, 4,计算过程:
1,(1+5)%8=6,(6+5)%8=3, (3+5)%8=0, (0+5)%8=5, (5+5)%8=2, (2+5)%8=7, (7+5)%8=4
可以看到只要随机数不一样,偷取p的顺序也不一样,但可以保证经过8次循环,每个p都会被访问到。可以用数论知识证明,不管nprocs是多少,这个算法都可以保证经过nprocs次循环,每个p都可以得到访问。
公平公正
note细节
note是go runtime实现的一次性睡眠和唤醒机制,一个线程可以通过调用notesleep(*note)进入睡眠状态,而另外一个线程则可以通过notewakeup(*note)把其唤醒。note的底层实现机制跟操作系统相关,不同系统使用不同的机制,比如linux下使用的futex系统调用,而mac下则是使用的pthread_cond_t条件变量,note对这些底层机制做了一个抽象和封装,这种封装给扩展性带来了很大的好处,比如当睡眠和唤醒功能需要支持新平台时,只需要在note层增加对特定平台的支持即可,不需要修改上层的任何代码。