都说go是高性能,吹的最多的在于Goroutine协程,本篇文章将在libgo的基础上讲述它的实现原理,各位看官仁者见仁。
1.go 关键字做了什么?
这便涉及到了go的编译器 gofrontend(该项目中包含了golib,所以下载这一个项目就可以搞懂goroutine的原理)。
// 在文件go\statements.cc中
// Class Go_statement. 通过注释我们知道它是处理go关键字的,
Bstatement*
Go_statement::do_get_backend(Translate_context* context)
{
...
//核心在此处,创建一个Runtime::GO的表达式,而这个表达式是call指令(此处不再展开,因为内部就是表达式的组装)
//GO的定义为:DEF_GO_RUNTIME(GO, "__go_go", P2(UINTPTR, POINTER), R1(POINTER))
//而它定义在:static const Runtime_function runtime_functions[]中,故而通过runtime_functions[Runtime::GO]就可以拿到Runtime_function 的定义。
//而Runtime_function结构的name便是DEF_GO_RUNTIME中的第二个参数,此处为__go_go
//当触发make_call时会使用__go_go作为函数进行调用,并且传入fn。
Expression* call = Runtime::make_call(gogo, Runtime::GO, this->location(), 2,
fn, arg);
...
return context->backend()->expression_statement(bfunction, bcall);
}
至此我们找到了go的执行入口。
//如果直接检索该关键字并不会找到定义的函数,因为它被go编译器的另一个特性给重命名了
//go:linkname newproc __go_go 将newproc命名为__go_go,故而go执行时最终执行的函数为newproc
func newproc(fn uintptr, arg unsafe.Pointer) *g {
...//一些列获取g的空闲对象(g的对象池)
//获取当前所执行协程所在的p(处理器)对象
_p_ := _g_.m.p.ptr()
//如果没有获取到则进行创建
newg = malg(true, false, &sp, &spsize)
//将需要执行的函数设置到entryfn 中
newg.entryfn = fn
var entry func(unsafe.Pointer)
*(*unsafe.Pointer)(unsafe.Pointer(&entry)) = unsafe.Pointer(&newg.entryfn)
newg.entry = entry
...
//给当前协程创建上下文
makeGContext(newg, sp, spsize)
//将当前协程添加到处理器中
runqput(_p_, newg, true)
//如果已经启动则唤醒p进行执行
if mainStarted {
//唤醒p(处理器)
wakep()
}
//至此协程构建成功
return newg
}
至此我们知道了go关键字触发时如何构建任务的,与任务是如何添加到对应的处理器中,runqput函数并为展开代码,所以需要注意的是每个处理器p只有256个长度的数组作为存储协程的对象,这个在p的结构中会发现,之所以提他是因为runqput函数执行时发现当前处理器任务已经满了,则会添加到全局的sched.runq队列中。该数据类型是队列所以可以无限制增加,当有处理器处理完自己所持有的协程时,将会从该队列中获取任务执行(关键函数:runqput->runqputslow->globrunqputbatch,即可验证上述言论。)。
2._p_是何时创建的?
//libgo\runtime\go-main.c
int main (int argc, char **argv)
{
....
//在go程序启动时会调用runtime_schedinit,而该函数对应libgo\go\runtime\proc.go中schedinit函数
runtime_schedinit ();
__go_go ((uintptr)(runtime_main), NULL);
//其中machine进行执行
runtime_mstart (runtime_m ());
abort ();
}
最终执行
func schedinit() {
...
//进行_p_的处理
if procresize(procs) != nil {
throw("unknown runnable goroutine during bootstrap")
}
...
}
func procresize(nprocs int32) *p {
...
//首先根据传入的处理器个数创建一个处理器数组
if nprocs > int32(len(allp)) {
allp = allp[:nprocs]
...
}
...
//遍历所有处理器,目前都是新建所以runq一定是空的故而等于将所有p通过pidleput添加到idle队列中。
for i := nprocs - 1; i >= 0; i-- {
p := allp[i]
...
if runqempty(p) {
pidleput(p)
}
}
...
}
至此所有p创建成功。网上文章中g与p都创建完成了还缺少一个m(machine)。
3.何时创建M?
在创建g中存在wakep的函数便是最终唤醒操作,若至此都还没有唤醒则会在创建p后的runtime_mstart中进行执行。首先我们先看wakep。
func wakep() {
...
//启动m
startm(nil, true)
}
func startm(_p_ *p, spinning bool) {
mp := acquirem()
lock(&sched.lock)
...
//在startm调用时并没有p则代表_p_允许为空则从idle中获取一个
if _p_ == nil {
//与p创建则相互呼应
_p_ = pidleget()
}
//尝试从machine idle中获取一个如果没有则创建一个新的m
nmp := mget()
if nmp == nil {
//创建一个m
newm(fn, _p_, id)
releasem(mp)
return
}
//若存在则将_p_进行绑定
nmp.spinning = spinning
nmp.nextp.set(_p_)
//绑定后则进行唤醒(machine对应的则是具体的线程故而需要唤醒执行)
notewakeup(&nmp.park)
releasem(mp)
}
//创建m
func newm(fn func(), _p_ *p, id int64) {
//首先分配一个m
mp, _, _ := allocm(_p_, fn, id, false)
mp.nextp.set(_p_)
mp.sigmask = initSigmask
//将分配的m空间传入newm1
newm1(mp)
releasem(getg().m)
}
func newm1(mp *m) {
execLock.rlock() // Prevent process clone.
newosproc(mp)
execLock.runlock()
}
//newosproc该函数也进行了重命名文件为:libgo\runtime\proc.c
void runtime_newosproc(M *mp){
...
//共尝试20次
for (tries = 0; tries < 20; tries++) {
//创建线程最终执行函数为runtime_mstart
ret = pthread_create(&tid, &attr, runtime_mstart, mp);
//当创建成功则跳出否则睡眠等待再次创建
if (ret != EAGAIN) {
break;
}
runtime_usleep((tries + 1) * 1000); // Milliseconds.
}
}
//最终执行函数为runtime_mstart,同时也是go的main函数中直接调用的runtime_mstart。
extern void mstart1()
__asm__(GOSYM_PREFIX "runtime.mstart1");
void* runtime_mstart(void *arg) {
...
//该函数最终跳回到go
mstart1();
return nil;
}
func mstart1() {
//最终调用调度进行执行。
schedule()
}
至此两种M的创建都完成了,由此得出每个M都绑定了一个线程。
4._p_、g、m 调度是如何的?
上方函数执行完后最终都会调用到schedule中,此处将讲解schedule的实现。
func schedule() {
//获取当前m正在执行的协程g
_g_ := getg()
top:
pp := _g_.m.p.ptr()
pp.preempt = false
//若当前需要gc了则进去gcstopm进行等待
if sched.gcwaiting != 0 {
gcstopm()
goto top
}
//是否存在安全点函数,存在则调用
if pp.runSafePointFn != 0 {
runSafePointFn()
}
...
var gp *g
var inheritTime bool
//是否需要调用wakep默认false
tryWakeP := false
//如果启动了跟踪则执行跟踪的协程g
if trace.enabled || trace.shutdown {
gp = traceReader()
if gp != nil {
casgstatus(gp, _Gwaiting, _Grunnable)
traceGoUnpark(gp, 0)
tryWakeP = true
}
}
//不存在追踪则是否有gc工作,有的话获取进行优先执行
if gp == nil && gcBlackenEnabled != 0 {
gp = gcController.findRunnableGCWorker(_g_.m.p.ptr())
if gp != nil {
tryWakeP = true
}
}
//也不存在gc工作,那么则从全局sched中获取任务
//在获取执行任务的同时从sched中批量获取任务添加到当前_p_的任务队列中,因为sched的锁是全局锁,会导致所有的m进行竞争
//多次的竞争会导致性能降低,故而批量获取与jvm的电梯算法解决相同问题
if gp == nil {
if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 {
lock(&sched.lock)
gp = globrunqget(_g_.m.p.ptr(), 1)
unlock(&sched.lock)
}
}
//若不存在全局任务,则从本地任务中获取
if gp == nil {
gp, inheritTime = runqget(_g_.m.p.ptr())
//获取的任务发现是抢占任务,则丢到全局队列中,返回top从新执行
if gp != nil && gp.preempt {
gp.preempt = false
lock(&sched.lock)
globrunqput(gp)
unlock(&sched.lock)
goto top
}
}
//若还未找到可执行的任务则从别的_p_中窃取
if gp == nil {
gp, inheritTime = findrunnable()
}
...
if tryWakeP {
wakep()
}
//如果当前任务是需要暂停当前m则进行停止
if gp.lockedm != 0 {
//在内部调用stopm
startlockedm(gp)
goto top
}
//否则最终执行该协程g
execute(gp, inheritTime)
}
func execute(gp *g, inheritTime bool) {
...
//最终调用gogo
gogo(gp)
}
//被重命名对应文件:libgo\runtime\proc.c
void runtime_gogo(G* newg) {
g = newg;
newg->fromgogo = true;
fixcontext(ucontext_arg(&newg->context[0]));
//恢复当前协程g的上下文
__go_setcontext(ucontext_arg(&newg->context[0]));
runtime_throw("gogo setcontext returned");
}
//该函数是以汇编实现的
.globl __go_setcontext
.text
__go_setcontext:
movq RBX_OFF(%rdi), %rbx
movq RBP_OFF(%rdi), %rbp
movq R12_OFF(%rdi), %r12
movq R13_OFF(%rdi), %r13
movq R14_OFF(%rdi), %r14
movq R15_OFF(%rdi), %r15
movq SP_OFF(%rdi), %rsp //ret返回函数的地址
movq PC_OFF(%rdi), %rdx
jmp *%rdx
至此go的完整执行,在切换上下文时并没有产生系统调用故而实在用户层切换,由此性能得到了显著提高,但是并不代表处理性能的提高,处理速度与任务个数并没有必然联系。那么它是如何做到处理性能提高的呢?
5.处理性能提高的核心秘密
核心在于减少了线程执行中的await操作,说白了让线程一直运行,从不阻塞(意义上),此处以网络与文件操作(耗时操作)为例。
//文件读取
func (f *File) read(b []byte) (n int, err error) {
//调用了FD的read函数
n, err = f.pfd.Read(b)
...
}
func (fd *FD) Read(p []byte) (int, error) {
....
//最终执行waitRead
if err = fd.pd.waitRead(fd.isFile); err == nil {
}
}
func (pd *pollDesc) waitRead(isFile bool) error {
return pd.wait('r', isFile)
}
func (pd *pollDesc) wait(mode int, isFile bool) error {
//至此我们看到了pollWait其核心便在此处
//需要注意的是当前函数在internal目录中package poll包下
res := runtime_pollWait(pd.runtimeCtx, mode)
return convertErr(res, isFile)
}
//重命名为poll下的
//go:linkname poll_runtime_pollWait internal_1poll.runtime__pollWait
func poll_runtime_pollWait(ctx uintptr, mode int) int {
//调用block进行阻塞
for !netpollblock(pd, int32(mode), false) {
}
return pollNoError
}
func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
...
//其核心在此处进行park操作
gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceEvGoBlockNet, 5)
...
}
//阻塞是需要执行的函数
func netpollblockcommit(gp *g, gpp unsafe.Pointer) bool {
//将当前协程g设置到gpp地址中,gpp地址便是&pd.rg或者&pd.wg
r := atomic.Casuintptr((*uintptr)(gpp), pdWait, uintptr(unsafe.Pointer(gp)))
if r {
// Bump the count of goroutines waiting for the poller.
// The scheduler uses this to decide whether to block
// waiting for the poller if there is nothing else to do.
atomic.Xadd(&netpollWaiters, 1)
}
return r
}
func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason waitReason, traceEv byte, traceskip int) {
...
//进入此处调用mcall
mcall(park_m)
}
//进行了重命名
void runtime_mcall(FuncVal *fv) {
G *gp;
//获取当前协程g
gp = g;
//此处必然不是空
if(gp != nil) {
gp->fromgogo = false;
//将当前协程go的上下文存储到context中,汇编实现
__go_getcontext(ucontext_arg(&gp->context[0]));
//(原注释)当我们从 getcontext 返回时,我们可能运行在一个新的线程中。这意味着 g 可能已经改变。g 是一个全局变量,所以我们会重新加载它,但 g 的地址可能已经被缓存到我们的本地栈帧中,而这个地址可能是错误的。调用函数来为当前线程重新加载这个值。
gp = runtime_g();
}
//若gp未发生改变那么fromgogo则为false或者当前gp的fromgogo本身也是false则进入此处进行执行
if (gp == nil || !gp->fromgogo) {
//将传入的函数设置到entry
mp->g0->entry = fv;
mp->g0->param = gp;
//然后将g0给全局变量g
g = mp->g0;
//将当前协程g的上下文还原
__go_setcontext(ucontext_arg(&mp->g0->context[0]));
runtime_throw("runtime: mcall function returned");
}
}
至此完成了上下文的切换,但是此处执行的g0和当前任务无关,那么何时调用的传入的park_m呢?核心函数是kickoff。
//前文调用过makeGContext进行构建协程g的上下文
void makeGContext(G* gp, byte* sp, uintptr spsize) {
__go_context_t *uc;
uc = ucontext_arg(&gp->context[0]);
__go_getcontext(uc);
//构建上下文
__go_makecontext(uc, kickoff, sp, (size_t)spsize);
}
//构建上下文
.globl __go_makecontext
.text
__go_makecontext:
addq %rcx, %rdx
andq $~0xf, %rdx
subq $8, %rdx
movq $0, (%rdx)
//将传入的kickoff地址记录到PC_OFF处
movq %rdx, SP_OFF(%rdi)
movq %rsi, PC_OFF(%rdi)
ret
从而得知每个协程构建时都会有个ret返回的pc地址,而该地址便是统一的kickoff函数。那么前文所切换的g0当执行完后则会进入该函数。
func kickoff() {
gp := getg()
//获取当前设置的entry 也即park_m
fv := gp.entry
param := gp.param
...
gp.entrysp = getsp()
//进行调用
fv(param)
goexit1()
}
func park_m(gp *g) {
_g_ := getg()
...
casgstatus(gp, _Grunning, _Gwaiting)
dropg()
//waitunlockf 为netpollblockcommit函数
//waitlock 则是&pd.rg或者&pd.wg
if fn := _g_.m.waitunlockf; fn != nil {
//调用netpollblockcommit
ok := fn(gp, _g_.m.waitlock)
_g_.m.waitunlockf = nil
_g_.m.waitlock = nil
}
//进行调度
schedule()
}
至此协程切换完成,那么何时又如何唤醒呢?
//schedule中包含一个函数findrunnable
func findrunnable() (gp *g, inheritTime bool) {
...
//包含如下内容
if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Load64(&sched.lastpoll) != 0 {
//从netpoll中获取一个列表
if list := netpoll(0); !list.empty() { // non-blocking
//从列表中弹出一个协程g
gp := list.pop()
//将剩余的协程加入到运行队列中
injectglist(&list)
casgstatus(gp, _Gwaiting, _Grunnable)
if trace.enabled {
traceGoUnpark(gp, 0)
}
return gp, false
}
}
...
}
func netpoll(delay int64) gList {
...
var events [128]epollevent
retry:
//进入系统调用进行wait,
n := epollwait(epfd, &events[0], int32(len(events)), waitms)
if n < 0 {
e := errno()
if e != _EINTR {
println("runtime: epollwait on fd", epfd, "failed with", e)
throw("runtime: netpoll failed")
}
// If a timed sleep was interrupted, just return to
// recalculate how long we should sleep now.
if waitms > 0 {
return gList{}
}
goto retry
}
var toRun gList
//上方epollwait若查不出结果则直接返回空,否则进入for迭代事件
for i := int32(0); i < n; i++ {
ev := &events[i]
if ev.events == 0 {
continue
}
...
//从ev中获取自定义data 此处为pd(pollDesc)
pd := *(**pollDesc)(unsafe.Pointer(&ev.data))
pd.setEventErr(ev.events == _EPOLLERR)
//将其设置为ready状态
netpollready(&toRun, pd, mode)
}
return toRun
}
func netpollready(toRun *gList, pd *pollDesc, mode int32) {
var rg, wg *g
//根据了下获取pd中对应的协程g
if mode == 'r' || mode == 'r'+'w' {
rg = netpollunblock(pd, 'r', true)
}
if mode == 'w' || mode == 'r'+'w' {
wg = netpollunblock(pd, 'w', true)
}
//将获取的结果加入到运行队列中
if rg != nil {
toRun.push(rg)
}
if wg != nil {
toRun.push(wg)
}
}
func netpollunblock(pd *pollDesc, mode int32, ioready bool) *g {
gpp := &pd.rg
if mode == 'w' {
gpp = &pd.wg
}
for {
//在park_m中netpollblockcommit 设置了gpp为对应的协程g的地址,此处进行读取
old := gpp.Load()
...
//设置协程地址为pdReady
var new uintptr
if ioready {
new = pdReady
}
if gpp.(old, new) {
if old == pdWait {
old = 0
}
//最终返回协程对象
return (*g)(unsafe.Pointer(old))
}
}
}
这便是协程的全部执行逻辑,前文提到以文件与网络作为案例,而在操作文件是发现他共用了网络的epoll解决阻塞问题。
6.总结
协程的核心在于任务池(协程池),不管是_p_内部的256任务数组,还是全局的sched结构都是为了记录用户通过go所创建的任务。若以该设计来说任何语言均可实现。但是go之所以凸出他还实现了阻塞时的任务间的上下文切换,从而减少了系统调用(若是java线程池,阻塞则进入系统级别的上下文切换,也即将这一套操作交由操作系统完成,而go则时用户级别实现,从而大大的提高性能,可以保证线程不暂停的一直运行下去)。该设计对于io密集型项目性能将由显著的提高,因为当协程遇到阻塞就进行切换,其工作的内容交由操作系统完成,他则继续执行别的协程,当系统执行完阻塞内容后再次将对应协程加入到执行队列中,从而提高阻塞任务的并行数。