Go为什么高性能?原来GoRoutine是这样实现的

都说go是高性能,吹的最多的在于Goroutine协程,本篇文章将在libgo的基础上讲述它的实现原理,各位看官仁者见仁。

1.go 关键字做了什么?

这便涉及到了go的编译器 gofrontend(该项目中包含了golib,所以下载这一个项目就可以搞懂goroutine的原理)。

// 在文件go\statements.cc中
// Class Go_statement. 通过注释我们知道它是处理go关键字的,
Bstatement*
Go_statement::do_get_backend(Translate_context* context)
{
  ...
  //核心在此处,创建一个Runtime::GO的表达式,而这个表达式是call指令(此处不再展开,因为内部就是表达式的组装)
  //GO的定义为:DEF_GO_RUNTIME(GO, "__go_go", P2(UINTPTR, POINTER), R1(POINTER))
  //而它定义在:static const Runtime_function runtime_functions[]中,故而通过runtime_functions[Runtime::GO]就可以拿到Runtime_function 的定义。
  //而Runtime_function结构的name便是DEF_GO_RUNTIME中的第二个参数,此处为__go_go
  //当触发make_call时会使用__go_go作为函数进行调用,并且传入fn。
  Expression* call = Runtime::make_call(gogo, Runtime::GO, this->location(), 2,
                    fn, arg);
  ...
  return context->backend()->expression_statement(bfunction, bcall);
}

至此我们找到了go的执行入口。

//如果直接检索该关键字并不会找到定义的函数,因为它被go编译器的另一个特性给重命名了
//go:linkname newproc __go_go 将newproc命名为__go_go,故而go执行时最终执行的函数为newproc
func newproc(fn uintptr, arg unsafe.Pointer) *g {
  ...//一些列获取g的空闲对象(g的对象池)
  //获取当前所执行协程所在的p(处理器)对象
  _p_ := _g_.m.p.ptr()
  //如果没有获取到则进行创建
  newg = malg(true, false, &sp, &spsize)
  //将需要执行的函数设置到entryfn 中
  newg.entryfn = fn
  var entry func(unsafe.Pointer)
  *(*unsafe.Pointer)(unsafe.Pointer(&entry)) = unsafe.Pointer(&newg.entryfn)
  newg.entry = entry
  ...
  //给当前协程创建上下文
  makeGContext(newg, sp, spsize)
  //将当前协程添加到处理器中
  runqput(_p_, newg, true)
  //如果已经启动则唤醒p进行执行
  if mainStarted {
        //唤醒p(处理器)
    wakep()
  }
  //至此协程构建成功
  return newg
}

至此我们知道了go关键字触发时如何构建任务的,与任务是如何添加到对应的处理器中,runqput函数并为展开代码,所以需要注意的是每个处理器p只有256个长度的数组作为存储协程的对象,这个在p的结构中会发现,之所以提他是因为runqput函数执行时发现当前处理器任务已经满了,则会添加到全局的sched.runq队列中。该数据类型是队列所以可以无限制增加,当有处理器处理完自己所持有的协程时,将会从该队列中获取任务执行(关键函数:runqput->runqputslow->globrunqputbatch,即可验证上述言论。)。

2._p_是何时创建的?

//libgo\runtime\go-main.c
int main (int argc, char **argv)
{
  ....
  //在go程序启动时会调用runtime_schedinit,而该函数对应libgo\go\runtime\proc.go中schedinit函数
  runtime_schedinit ();
  __go_go ((uintptr)(runtime_main), NULL);
  //其中machine进行执行
  runtime_mstart (runtime_m ());
  abort ();
}

最终执行

func schedinit() {
  ...
  //进行_p_的处理
  if procresize(procs) != nil {
    throw("unknown runnable goroutine during bootstrap")
  }
  ...
}
func procresize(nprocs int32) *p {
  ...
  //首先根据传入的处理器个数创建一个处理器数组
  if nprocs > int32(len(allp)) {
    allp = allp[:nprocs]
    ...
  }
  ...
  //遍历所有处理器,目前都是新建所以runq一定是空的故而等于将所有p通过pidleput添加到idle队列中。
  for i := nprocs - 1; i >= 0; i-- {
    p := allp[i]
    ...
    if runqempty(p) {
      pidleput(p)
    }
  }
  ...
}

至此所有p创建成功。网上文章中g与p都创建完成了还缺少一个m(machine)。

3.何时创建M?

在创建g中存在wakep的函数便是最终唤醒操作,若至此都还没有唤醒则会在创建p后的runtime_mstart中进行执行。首先我们先看wakep。

func wakep() {
   ...
  //启动m
  startm(nil, true)
}
func startm(_p_ *p, spinning bool) {
    mp := acquirem()
    lock(&sched.lock)
    ...
    //在startm调用时并没有p则代表_p_允许为空则从idle中获取一个
    if _p_ == nil {
        //与p创建则相互呼应
        _p_ = pidleget()
    }
    //尝试从machine idle中获取一个如果没有则创建一个新的m
    nmp := mget()
    if nmp == nil {
        //创建一个m
        newm(fn, _p_, id)
        releasem(mp)
        return
    }
     //若存在则将_p_进行绑定
    nmp.spinning = spinning
    nmp.nextp.set(_p_)
    //绑定后则进行唤醒(machine对应的则是具体的线程故而需要唤醒执行)
    notewakeup(&nmp.park)
    releasem(mp)
}
//创建m
func newm(fn func(), _p_ *p, id int64) {
    //首先分配一个m
    mp, _, _ := allocm(_p_, fn, id, false)
    mp.nextp.set(_p_)
    mp.sigmask = initSigmask
    //将分配的m空间传入newm1
    newm1(mp)
    releasem(getg().m)
}

func newm1(mp *m) {
    execLock.rlock() // Prevent process clone.
    newosproc(mp)
    execLock.runlock()
}
//newosproc该函数也进行了重命名文件为:libgo\runtime\proc.c
void runtime_newosproc(M *mp){
    ...
    //共尝试20次
    for (tries = 0; tries < 20; tries++) {
        //创建线程最终执行函数为runtime_mstart
        ret = pthread_create(&tid, &attr, runtime_mstart, mp);
        //当创建成功则跳出否则睡眠等待再次创建
        if (ret != EAGAIN) {
            break;
        }
        runtime_usleep((tries + 1) * 1000); // Milliseconds.
    }
}
//最终执行函数为runtime_mstart,同时也是go的main函数中直接调用的runtime_mstart。
extern void mstart1()
  __asm__(GOSYM_PREFIX "runtime.mstart1");
void* runtime_mstart(void *arg) {
  ...
  //该函数最终跳回到go
  mstart1();
  return nil;
}
func mstart1() {
  //最终调用调度进行执行。
  schedule()
}

至此两种M的创建都完成了,由此得出每个M都绑定了一个线程。

4._p_、g、m 调度是如何的?

上方函数执行完后最终都会调用到schedule中,此处将讲解schedule的实现。

func schedule() {
    //获取当前m正在执行的协程g
    _g_ := getg()
top:
    pp := _g_.m.p.ptr()
    pp.preempt = false
    //若当前需要gc了则进去gcstopm进行等待
    if sched.gcwaiting != 0 {
        gcstopm()
        goto top
    }
    //是否存在安全点函数,存在则调用
    if pp.runSafePointFn != 0 {
        runSafePointFn()
    }
    ...
    var gp *g
    var inheritTime bool
    //是否需要调用wakep默认false
    tryWakeP := false

    //如果启动了跟踪则执行跟踪的协程g
    if trace.enabled || trace.shutdown {
        gp = traceReader()
        if gp != nil {
            casgstatus(gp, _Gwaiting, _Grunnable)
            traceGoUnpark(gp, 0)
            tryWakeP = true
        }
    }
    //不存在追踪则是否有gc工作,有的话获取进行优先执行
    if gp == nil && gcBlackenEnabled != 0 {
        gp = gcController.findRunnableGCWorker(_g_.m.p.ptr())
        if gp != nil {
            tryWakeP = true
        }
    }
    //也不存在gc工作,那么则从全局sched中获取任务
    //在获取执行任务的同时从sched中批量获取任务添加到当前_p_的任务队列中,因为sched的锁是全局锁,会导致所有的m进行竞争
    //多次的竞争会导致性能降低,故而批量获取与jvm的电梯算法解决相同问题
    if gp == nil {
        if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 {
            lock(&sched.lock)
            gp = globrunqget(_g_.m.p.ptr(), 1)
            unlock(&sched.lock)
        }
    }
    //若不存在全局任务,则从本地任务中获取
    if gp == nil {
        gp, inheritTime = runqget(_g_.m.p.ptr())
        //获取的任务发现是抢占任务,则丢到全局队列中,返回top从新执行
        if gp != nil && gp.preempt {
            gp.preempt = false
            lock(&sched.lock)
            globrunqput(gp)
            unlock(&sched.lock)
            goto top
        }
    }
    //若还未找到可执行的任务则从别的_p_中窃取
    if gp == nil {
        gp, inheritTime = findrunnable() 
    }
    ...
    if tryWakeP {
        wakep()
    }
    //如果当前任务是需要暂停当前m则进行停止
    if gp.lockedm != 0 {
        //在内部调用stopm
        startlockedm(gp)
        goto top
    }
    //否则最终执行该协程g
    execute(gp, inheritTime)
}
func execute(gp *g, inheritTime bool) {
  ...
  //最终调用gogo
  gogo(gp)
}
//被重命名对应文件:libgo\runtime\proc.c
void runtime_gogo(G* newg) {
    g = newg;
    newg->fromgogo = true;
    fixcontext(ucontext_arg(&newg->context[0]));
    //恢复当前协程g的上下文
    __go_setcontext(ucontext_arg(&newg->context[0]));
    runtime_throw("gogo setcontext returned");
}
//该函数是以汇编实现的
.globl __go_setcontext
.text
__go_setcontext:
    movq    RBX_OFF(%rdi), %rbx
    movq    RBP_OFF(%rdi), %rbp
    movq    R12_OFF(%rdi), %r12
    movq    R13_OFF(%rdi), %r13
    movq    R14_OFF(%rdi), %r14
    movq    R15_OFF(%rdi), %r15
    movq    SP_OFF(%rdi), %rsp  //ret返回函数的地址
    movq    PC_OFF(%rdi), %rdx

    jmp *%rdx

至此go的完整执行,在切换上下文时并没有产生系统调用故而实在用户层切换,由此性能得到了显著提高,但是并不代表处理性能的提高,处理速度与任务个数并没有必然联系。那么它是如何做到处理性能提高的呢?

5.处理性能提高的核心秘密

核心在于减少了线程执行中的await操作,说白了让线程一直运行,从不阻塞(意义上),此处以网络与文件操作(耗时操作)为例。

//文件读取
func (f *File) read(b []byte) (n int, err error) {
  //调用了FD的read函数
  n, err = f.pfd.Read(b)
  ...
}
func (fd *FD) Read(p []byte) (int, error) {
  ....
  //最终执行waitRead
 if err = fd.pd.waitRead(fd.isFile); err == nil {
  }
}
func (pd *pollDesc) waitRead(isFile bool) error {
    return pd.wait('r', isFile)
}
func (pd *pollDesc) wait(mode int, isFile bool) error {
    //至此我们看到了pollWait其核心便在此处
    //需要注意的是当前函数在internal目录中package poll包下
    res := runtime_pollWait(pd.runtimeCtx, mode)
    return convertErr(res, isFile)
}
//重命名为poll下的
//go:linkname poll_runtime_pollWait internal_1poll.runtime__pollWait
func poll_runtime_pollWait(ctx uintptr, mode int) int {
    //调用block进行阻塞
    for !netpollblock(pd, int32(mode), false) {
    }
    return pollNoError
}
func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
    ...
    //其核心在此处进行park操作
    gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceEvGoBlockNet, 5)
    ...
}
//阻塞是需要执行的函数
func netpollblockcommit(gp *g, gpp unsafe.Pointer) bool {
     //将当前协程g设置到gpp地址中,gpp地址便是&pd.rg或者&pd.wg
    r := atomic.Casuintptr((*uintptr)(gpp), pdWait, uintptr(unsafe.Pointer(gp)))
    if r {
        // Bump the count of goroutines waiting for the poller.
        // The scheduler uses this to decide whether to block
        // waiting for the poller if there is nothing else to do.
        atomic.Xadd(&netpollWaiters, 1)
    }
    return r
}
func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason waitReason, traceEv byte, traceskip int) {
    ...
    //进入此处调用mcall
    mcall(park_m)
}
//进行了重命名
void runtime_mcall(FuncVal *fv) {
    G *gp;
    //获取当前协程g
    gp = g;
    //此处必然不是空
    if(gp != nil) {
        gp->fromgogo = false;
        //将当前协程go的上下文存储到context中,汇编实现
        __go_getcontext(ucontext_arg(&gp->context[0]));
        //(原注释)当我们从 getcontext 返回时,我们可能运行在一个新的线程中。这意味着 g 可能已经改变。g 是一个全局变量,所以我们会重新加载它,但 g 的地址可能已经被缓存到我们的本地栈帧中,而这个地址可能是错误的。调用函数来为当前线程重新加载这个值。
        gp = runtime_g();
       
    }
    //若gp未发生改变那么fromgogo则为false或者当前gp的fromgogo本身也是false则进入此处进行执行
    if (gp == nil || !gp->fromgogo) {
        //将传入的函数设置到entry
        mp->g0->entry = fv;
        mp->g0->param = gp;
        //然后将g0给全局变量g
        g = mp->g0;
        //将当前协程g的上下文还原
        __go_setcontext(ucontext_arg(&mp->g0->context[0]));
        runtime_throw("runtime: mcall function returned");
    }
}

至此完成了上下文的切换,但是此处执行的g0和当前任务无关,那么何时调用的传入的park_m呢?核心函数是kickoff。

//前文调用过makeGContext进行构建协程g的上下文
void makeGContext(G* gp, byte* sp, uintptr spsize) {
    __go_context_t *uc;

    uc = ucontext_arg(&gp->context[0]);
    __go_getcontext(uc);
    //构建上下文
    __go_makecontext(uc, kickoff, sp, (size_t)spsize);
}
//构建上下文
.globl __go_makecontext
.text
__go_makecontext:
    addq    %rcx, %rdx
    andq    $~0xf, %rdx
    subq    $8, %rdx
    movq    $0, (%rdx)
   //将传入的kickoff地址记录到PC_OFF处
    movq    %rdx, SP_OFF(%rdi)
    movq    %rsi, PC_OFF(%rdi)

    ret

从而得知每个协程构建时都会有个ret返回的pc地址,而该地址便是统一的kickoff函数。那么前文所切换的g0当执行完后则会进入该函数。

func kickoff() {
    gp := getg()
    //获取当前设置的entry 也即park_m
    fv := gp.entry
    param := gp.param
    ...
    gp.entrysp = getsp()
    //进行调用
    fv(param)
    goexit1()
}
func park_m(gp *g) {
    _g_ := getg()
    ...
    casgstatus(gp, _Grunning, _Gwaiting)
    dropg()
    //waitunlockf 为netpollblockcommit函数
    //waitlock 则是&pd.rg或者&pd.wg
    if fn := _g_.m.waitunlockf; fn != nil {
        //调用netpollblockcommit
        ok := fn(gp, _g_.m.waitlock)
        _g_.m.waitunlockf = nil
        _g_.m.waitlock = nil
    }
    //进行调度
    schedule()
}

至此协程切换完成,那么何时又如何唤醒呢?

//schedule中包含一个函数findrunnable
func findrunnable() (gp *g, inheritTime bool) {
    ...
    //包含如下内容
    if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Load64(&sched.lastpoll) != 0 {
        //从netpoll中获取一个列表
        if list := netpoll(0); !list.empty() { // non-blocking
            //从列表中弹出一个协程g
            gp := list.pop()
            //将剩余的协程加入到运行队列中
            injectglist(&list)
            casgstatus(gp, _Gwaiting, _Grunnable)
            if trace.enabled {
                traceGoUnpark(gp, 0)
            }
            return gp, false
        }
    }
    ...
}

func netpoll(delay int64) gList {
    ...
    var events [128]epollevent
retry:
    //进入系统调用进行wait,
    n := epollwait(epfd, &events[0], int32(len(events)), waitms)
    if n < 0 {
        e := errno()
        if e != _EINTR {
            println("runtime: epollwait on fd", epfd, "failed with", e)
            throw("runtime: netpoll failed")
        }
        // If a timed sleep was interrupted, just return to
        // recalculate how long we should sleep now.
        if waitms > 0 {
            return gList{}
        }
        goto retry
    }
    var toRun gList
    //上方epollwait若查不出结果则直接返回空,否则进入for迭代事件
    for i := int32(0); i < n; i++ {
        ev := &events[i]
        if ev.events == 0 {
            continue
        }
        ...
        //从ev中获取自定义data 此处为pd(pollDesc)
        pd := *(**pollDesc)(unsafe.Pointer(&ev.data))
        pd.setEventErr(ev.events == _EPOLLERR)
        //将其设置为ready状态
        netpollready(&toRun, pd, mode)
    }
    return toRun
}
func netpollready(toRun *gList, pd *pollDesc, mode int32) {
    var rg, wg *g
    //根据了下获取pd中对应的协程g
    if mode == 'r' || mode == 'r'+'w' {
        rg = netpollunblock(pd, 'r', true)
    }
    if mode == 'w' || mode == 'r'+'w' {
        wg = netpollunblock(pd, 'w', true)
    }
    //将获取的结果加入到运行队列中
    if rg != nil {
        toRun.push(rg)
    }
    if wg != nil {
        toRun.push(wg)
    }
}
func netpollunblock(pd *pollDesc, mode int32, ioready bool) *g {
    gpp := &pd.rg
    if mode == 'w' {
        gpp = &pd.wg
    }

    for {
        //在park_m中netpollblockcommit 设置了gpp为对应的协程g的地址,此处进行读取
        old := gpp.Load()
        ...
        //设置协程地址为pdReady
        var new uintptr
        if ioready {
            new = pdReady
        }
        if gpp.(old, new) {
            if old == pdWait {
                old = 0
            }
            //最终返回协程对象
            return (*g)(unsafe.Pointer(old))
        }
    }
}

这便是协程的全部执行逻辑,前文提到以文件与网络作为案例,而在操作文件是发现他共用了网络的epoll解决阻塞问题。

6.总结

协程的核心在于任务池(协程池),不管是_p_内部的256任务数组,还是全局的sched结构都是为了记录用户通过go所创建的任务。若以该设计来说任何语言均可实现。但是go之所以凸出他还实现了阻塞时的任务间的上下文切换,从而减少了系统调用(若是java线程池,阻塞则进入系统级别的上下文切换,也即将这一套操作交由操作系统完成,而go则时用户级别实现,从而大大的提高性能,可以保证线程不暂停的一直运行下去)。该设计对于io密集型项目性能将由显著的提高,因为当协程遇到阻塞就进行切换,其工作的内容交由操作系统完成,他则继续执行别的协程,当系统执行完阻塞内容后再次将对应协程加入到执行队列中,从而提高阻塞任务的并行数。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,222评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,455评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,720评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,568评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,696评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,879评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,028评论 3 409
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,773评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,220评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,550评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,697评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,360评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,002评论 3 315
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,782评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,010评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,433评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,587评论 2 350

推荐阅读更多精彩内容