深入分析go调度(三)

以下文章均为拜读公众号 源码游记 的笔记 http://mp.weixin.qq.com/mp/homepage?__biz=MzU1OTg5NDkzOA==&hid=1&sn=8fc2b63f53559bc0cee292ce629c4788&scene=18#wechat_redirect

3. Goroutine调度策略

前面提过,所谓的goroutine调度,是指程序代码按照一定的算法找到合适的g并放到CPU上去运行的过程。

所以有以下三个问题需要思考解决

  1. 调度时机:什么时候发生调度
  2. 调度策略:选取哪个g去调度
  3. 切换机制:怎么把g放到CPU中(这就是我们说的gogo

下面我们主要未然调度策略展开,看下我们是怎么选择g的。

1. schedule的操作

之前我们看到schedule在调度循环中将一个g放到CPU上去运行,但是在之前它还负责找到一个合适的g去调度。

这次我们主要关注那些调度算法

// One round of scheduler: find a runnable goroutine and execute it.
// Never returns.
func schedule() {
    _g_ := getg()   //_g_ = m.g0

    ......

    var gp *g

    ......
   
    if gp == nil {
    // Check the global runnable queue once in a while to ensure fairness.
    // Otherwise two goroutines can completely occupy the local runqueue
    // by constantly respawning each other.
       //为了保证调度的公平性,每个工作线程每进行61次调度就需要优先从全局运行队列中获取goroutine出来运行,
       //因为如果只调度本地运行队列中的goroutine,则全局运行队列中的goroutine有可能得不到运行
        if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 {
            lock(&sched.lock) //所有工作线程都能访问全局运行队列,所以需要加锁
            gp = globrunqget(_g_.m.p.ptr(), 1) //从全局运行队列中获取1个goroutine
            unlock(&sched.lock)
        }
    }
    if gp == nil {
        //从与m关联的p的本地运行队列中获取goroutine
        gp, inheritTime = runqget(_g_.m.p.ptr())
        if gp != nil && _g_.m.spinning {
            throw("schedule: spinning with local work")
        }
    }
    if gp == nil {
        //如果从本地运行队列和全局运行队列都没有找到需要运行的goroutine,
        //则调用findrunnable函数从其它工作线程的运行队列中偷取,如果偷取不到,则当前工作线程进入睡眠,
        //直到获取到需要运行的goroutine之后findrunnable函数才会返回。
        gp, inheritTime = findrunnable() // blocks until work is available
    }

    ......

    //当前运行的是runtime的代码,函数调用栈使用的是g0的栈空间
    //调用execte切换到gp的代码和栈空间去运行
    execute(gp, inheritTime)  
}

由代码可见,schedule函数大概分为了三步去寻找可运行的g

三步查找

第一步

从全局队列中寻找g。为了保证调度的公平性,每个工作线程每经过61次调度就需要优先尝试从全局队列中找出一个g来运行,这样保证每个g都有机会得到调用。

之前提过曲剧队列是所有工作线程都可以访问的,所以在访问它之前需要加锁。

第二步

从本地运行队列寻找g,这个是常态,基本上都是从这里获取。这个的前提是在上一步没有取到g

第三步

从其他工作线程的运行队列中偷取g。

下面我们详细看下这三步

2.全局运行队列获取goroutine

下面我们来看下获取g的函数globrunqget

// Try get a batch of G's from the global runnable queue.
// Sched must be locked.
func globrunqget(_p_ *p, max int32) *g {
    if sched.runqsize == 0 { // 全局运行队列为空,前面已经加锁了
        return nil
    }

    // 以下为一些数量限制
    n := sched.runqsize/gomaxprocs + 1 // 按照p数量均分
    if n > sched.runqsize {
        n = sched.runqsize
    }
    if max > 0 && n > max { // 不超过最大值
        n = max
    }
    if n > int32(len(_p_.runq))/2 { // 不超过一半,如果runq为1个是不是永远取不走???
        n = int32(len(_p_.runq)) / 2
    }

    sched.runqsize -= n // 更新个数

    gp := sched.runq.pop() // 至少取走一个,
    n--
    for ; n > 0; n-- { // 循环取走,并放入本地运行队列
        gp1 := sched.runq.pop()  // 取
        runqput(_p_, gp1, false) // 放
    }
    return gp // 把第一个return ,用做运行
}

该函数的第一个参数当前工作线程绑定的p,第二个参数是最多可以取多少个。

并且如果多于1个,那么其他取出放入本地运行队列。

3. 本地运行队列中获取goroutine

函数runqget

// Get g from local runnable queue.
// If inheritTime is true, gp should inherit the remaining time in the
// current time slice. Otherwise, it should start a new time slice.
// Executed only by the owner P.
func runqget(_p_ *p) (gp *g, inheritTime bool) {
    // If there's a runnext, it's the next G to run.
    for {
        next := _p_.runnext // runnext 高优插队者,如果有的话 优先runnext TODO 查询runnext在什么条件下出现
        if next == 0 {
            break
        }
        if _p_.runnext.cas(next, 0) {
            return next.ptr(), true
        }
    }

    for {
        // 获取首尾
        h := atomic.LoadAcq(&_p_.runqhead) // load-acquire, synchronize with other consumers
        t := _p_.runqtail
        if t == h {
            return nil, false
        }
        gp := _p_.runq[h%uint32(len(_p_.runq))].ptr()
        // 原子+1
        if atomic.CasRel(&_p_.runqhead, h, h+1) { // cas-release, commits consume
            return gp, false
        }
    }
}

这里需要注意的是取gp时候是加锁的。

因为有其他的工作线程有可能也在访问这2个成员,比如有人正在偷你的g

对于语义

<img src="http://picgo.vipkk.work/20200523210047.png" alt="image-20200523210047055" style="zoom:50%;" />

CAS操作和ABA问题

CAS 操作会造成[ABA问题][ABA]

那么在runqget中会不会呢,答案是不会的,分析如下

对于runnext

  1. 只有runqget才会将其改为0,其他地方都只是将他改为非0值
  2. 任何时刻只有一个当前工作线程可以执行runqget,不会并发,也就不存在其他人将其改为0

对于runq

  1. 只有当前线程才会去往自己的队列添加g,知会去偷g,从而修改runqhead
  2. 所以,当我们这个工作线程从runqhead读取到值A之后,其它工作线程也就不可能修改runqhead的值为B之后再第二次把它修改为值A(因为runqtail在这段时间之内不可能被修改,runqhead的值也就无法越过runqtail再回绕到A值),也就是说,代码从逻辑上已经杜绝了引发ABA的条件。

4. 从其他线程偷取G

      //如果从本地运行队列和全局运行队列都没有找到需要运行的goroutine,
      //则调用findrunnable函数从其它工作线程的运行队列中偷取,如果偷取不到,则当前工作线程进入睡眠,
      //直到获取到需要运行的goroutine之后findrunnable函数才会返回。
      gp, inheritTime = findrunnable() // blocks until work is available

findrunnable函数负责处理与盗取相关的逻辑,另外该函数还负责gc和netpoll等相关的事情。目前我们先关注偷取算法。因为代码太长我们分步骤来看

// Finds a runnable goroutine to execute.
// Tries to steal from other P's, get g from global queue, poll network.
func findrunnable() (gp *g, inheritTime bool) {
}

在偷取之前再看下本地和全局有没有可运行的队列,有的话取出返回收工

    // local runq 检查本地队列是否有可以运行的
    if gp, inheritTime := runqget(_p_); gp != nil {
        return gp, inheritTime
    }

    // global runq 检查全局队列是否有可以运行的
    if sched.runqsize != 0 {
        lock(&sched.lock)
        gp := globrunqget(_p_, 0)
        unlock(&sched.lock)
        if gp != nil {
            return gp, false
        }
    }

接下来是一些netpoll相关,跳过。

再然后是fastpath,快速失败

需要注意fastpath的2个点

  1. m 没有在自旋
  2. 自旋的个数 大于 work的个数。2*atomic.Load(&sched.nmspinning) >= procs-atomic.Load(&sched.npidle)
    // Steal work from other P's.
    procs := uint32(gomaxprocs)
    ranTimer := false
    // If number of spinning M's >= number of busy P's, block.
    // This is necessary to prevent excessive CPU consumption
    // when GOMAXPROCS>>1 but the program parallelism is low.
    // 这里的意思是 m还没有开始寻找,但是已经有超多一半的m在寻找了,那么它就不找了,这样省CPU。
    if !_g_.m.spinning && 2*atomic.Load(&sched.nmspinning) >= procs-atomic.Load(&sched.npidle) {
        goto stop
    }
    if !_g_.m.spinning {
        _g_.m.spinning = true
        atomic.Xadd(&sched.nmspinning, 1)
    }

接着开始偷取,一共进行4次尝试。如果成功,则偷取一半并返回。当进行第3、4次尝试的时候,也会尝试偷取runnext。

这里的偷取为了公平是随机选p的。不论位置还是下一个都是随机的详见[随机偷取算法](# 随机偷取算法)

    for i := 0; i < 4; i++ {
        // 随机开始,随机下一个
        for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() {
            if sched.gcwaiting != 0 {
                goto top
            }
            stealRunNextG := i > 2 // first look for ready queues with more than 1 g
            p2 := allp[enum.position()]
            if _p_ == p2 {
                continue
            }
            // 如果成功取出一半
            if gp := runqsteal(_p_, p2, stealRunNextG); gp != nil {
                return gp, false
            }
        }
    }

当找到的时候,通过runqsteal来偷取,

这个逻辑比较简单,grab之后获取到根据大小来跟新 runqhead & runqtail

// Steal half of elements from local runnable queue of p2
// and put onto local runnable queue of p.
// Returns one of the stolen elements (or nil if failed).
func runqsteal(_p_, p2 *p, stealRunNextG bool) *g {
    t := _p_.runqtail
    n := runqgrab(p2, &_p_.runq, t, stealRunNextG)
    if n == 0 {
        return nil
    }
    n--
    gp := _p_.runq[(t+n)%uint32(len(_p_.runq))].ptr()
    if n == 0 {
        return gp
    }
    h := atomic.LoadAcq(&_p_.runqhead) // load-acquire, synchronize with consumers
    if t-h+n >= uint32(len(_p_.runq)) {
        throw("runqsteal: runq overflow")
    }
    atomic.StoreRel(&_p_.runqtail, t+n) // store-release, makes the item available for consumption
    return gp
}

接下来就是runqgrad这里负责了真正的偷取

grad 根据_p_的大小判断偷取的一半,然后放进batch里,再更新head&tail。 至此整个偷取算法完成。

// Grabs a batch of goroutines from _p_'s runnable queue into batch.
// Batch is a ring buffer starting at batchHead.
// Returns number of grabbed goroutines.
// Can be executed by any P.
func runqgrab(_p_ *p, batch *[256]guintptr, batchHead uint32, stealRunNextG bool) uint32 {
    for {
        h := atomic.LoadAcq(&_p_.runqhead) // load-acquire, synchronize with other consumers
        t := atomic.LoadAcq(&_p_.runqtail) // load-acquire, synchronize with the producer
        n := t - h
        n = n - n/2
        if n == 0 {
            ...
            return 0
        }
        if n > uint32(len(_p_.runq)/2) { // read inconsistent h and t
            continue
        }
        for i := uint32(0); i < n; i++ {
            g := _p_.runq[(h+i)%uint32(len(_p_.runq))]
            batch[(batchHead+i)%uint32(len(batch))] = g
        }
        if atomic.CasRel(&_p_.runqhead, h, h+n) { // cas-release, commits consume
            return n
        }
    }
}

再回到findrunnable,如果四次都没有成功则进入stop模式(ps 忽略runtime,暂时没看懂)

在这里首先将m与p解除绑定,然后准备去休眠,然后把p放到空闲队列,不过在结束之前进行了最后一次判断。

最后stopm

stop:

    ......

    // Before we drop our P, make a snapshot of the allp slice,
    // which can change underfoot once we no longer block
    // safe-points. We don't need to snapshot the contents because
    // everything up to cap(allp) is immutable.
    allpSnapshot := allp

    // return P and block
    lock(&sched.lock)
 
    ......
 
    if sched.runqsize != 0 {
        gp := globrunqget(_p_, 0)
        unlock(&sched.lock)
        return gp, false
    }
   
    // 当前工作线程解除与p之间的绑定,准备去休眠
    if releasep() != _p_ {
        throw("findrunnable: wrong p")
    }
    //把p放入空闲队列
    pidleput(_p_)
    unlock(&sched.lock)

// Delicate dance: thread transitions from spinning to non-spinning state,
// potentially concurrently with submission of new goroutines. We must
// drop nmspinning first and then check all per-P queues again (with
// #StoreLoad memory barrier in between). If we do it the other way around,
// another thread can submit a goroutine after we've checked all run queues
// but before we drop nmspinning; as the result nobody will unpark a thread
// to run the goroutine.
// If we discover new work below, we need to restore m.spinning as a signal
// for resetspinning to unpark a new worker thread (because there can be more
// than one starving goroutine). However, if after discovering new work
// we also observe no idle Ps, it is OK to just park the current thread:
// the system is fully loaded so no spinning threads are required.
// Also see "Worker thread parking/unparking" comment at the top of the file.
    wasSpinning := _g_.m.spinning
    if _g_.m.spinning {
        //m即将睡眠,状态不再是spinning
        _g_.m.spinning = false
        if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {
            throw("findrunnable: negative nmspinning")
        }
    }

    // check all runqueues once again
    // 休眠之前再看一下是否有工作要做
    for _, _p_ := range allpSnapshot {
        if !runqempty(_p_) {
            lock(&sched.lock)
            _p_ = pidleget()
            unlock(&sched.lock)
            if _p_ != nil {
                acquirep(_p_)
                if wasSpinning {
                    _g_.m.spinning = true
                    atomic.Xadd(&sched.nmspinning, 1)
                }
                goto top
            }
            break
        }
    }

    ......
    //休眠
    stopm()
    goto top
}

最后万般无奈进入stopm

// Stops execution of the current m until new work is available.
// Returns with acquired P.
func stopm() {
    _g_ := getg()

    if _g_.m.locks != 0 {
        throw("stopm holding locks")
    }
    if _g_.m.p != 0 {
        throw("stopm holding p")
    }
    if _g_.m.spinning {
        throw("stopm spinning")
    }

    lock(&sched.lock)
    mput(_g_.m)   //把m结构体对象放入sched.midle空闲队列
    unlock(&sched.lock)
    notesleep(&_g_.m.park)  //进入睡眠状态
 
    //被其它工作线程唤醒
    noteclear(&_g_.m.park)
    acquirep(_g_.m.nextp.ptr())
    _g_.m.nextp = 0
}

stopm的主要操作就是将m放到 midle队列里,然后notesleep 进入睡眠状态。note的是go的一次性睡眠和唤醒机制,具体参考note细节

继续当notesleep返回后,继续绑定p 然后开始新的循环调度。

至于notesleep,它是通过futex完成的sleep。

从前面的代码我们已经看到,stopm调用notesleep时给它传递的参数是m结构体的park成员,而m又早已通过mput放入了全局的milde空闲队列,这样其它运行着的线程一旦发现有更多的goroutine需要运行时就可以通过全局的m空闲队列找到处于睡眠状态的m,然后调用notewakeup(&m.park)将其唤醒,至于怎么唤醒,我们在其它章节继续讨论.

补充

随机偷取算法


type randomEnum struct {
    i     uint32
    count uint32
    pos   uint32
    inc   uint32
}

func (ord *randomOrder) reset(count uint32) {
    ord.count = count
    ord.coprimes = ord.coprimes[:0]
    for i := uint32(1); i <= count; i++ {
        if gcd(i, count) == 1 {
            ord.coprimes = append(ord.coprimes, i) // 找比count小的质数作为incr
        }
    }
}

func (ord *randomOrder) start(i uint32) randomEnum {
    return randomEnum{
        count: ord.count,
        pos:   i % ord.count,
        inc:   ord.coprimes[i%uint32(len(ord.coprimes))], // 随机选一个质数
    }
}

func (enum *randomEnum) done() bool {
    return enum.i == enum.count
}

func (enum *randomEnum) next() {
    enum.i++
    enum.pos = (enum.pos + enum.inc) % enum.count // 随机下一个
}

下面举例说明一下上述算法过程,现假设nprocs为8,也就是一共有8个p。

如果第一次随机选择的offset = 6,coprime = 3(3与8互质,满足算法要求)的话,则从allp切片中偷取的下标顺序为6, 1, 4, 7, 2, 5, 0, 3,计算过程:

6,(6+3)%8=1,(1+3)%8=4, (4+3)%8=7, (7+3)%8=2, (2+3)%8=5, (5+3)%8=0, (0+3)%8=3

如果第二次随机选择的offset = 4,coprime = 5的话,则从allp切片中偷取的下标顺序为1, 6, 3, 0, 5, 2, 7, 4,计算过程:

1,(1+5)%8=6,(6+5)%8=3, (3+5)%8=0, (0+5)%8=5, (5+5)%8=2, (2+5)%8=7, (7+5)%8=4

可以看到只要随机数不一样,偷取p的顺序也不一样,但可以保证经过8次循环,每个p都会被访问到。可以用数论知识证明,不管nprocs是多少,这个算法都可以保证经过nprocs次循环,每个p都可以得到访问。

公平公正

note细节

note是go runtime实现的一次性睡眠和唤醒机制,一个线程可以通过调用notesleep(*note)进入睡眠状态,而另外一个线程则可以通过notewakeup(*note)把其唤醒。note的底层实现机制跟操作系统相关,不同系统使用不同的机制,比如linux下使用的futex系统调用,而mac下则是使用的pthread_cond_t条件变量,note对这些底层机制做了一个抽象和封装,这种封装给扩展性带来了很大的好处,比如当睡眠和唤醒功能需要支持新平台时,只需要在note层增加对特定平台的支持即可,不需要修改上层的任何代码。

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容