本文从上下文Context、同步原语与锁、Channel、调度器四个方面介绍Go语言是如何实现并发的。本文绝大部分内容是从go并发编程系列文章学习总结而来。
上下文Context
上下文context.Context是用来设置截止日期、同步信号,传递请求相关值的结构体。上下文与Gorutine有比较密切的关系。context.Context是Go语言中独特的设计(java中至少没有类似设计)。context.Context是Go语言在1.7版本中引入标准库的接口,该接口定义了四个需要实现的方法,如下:
type Context interface {
Deadline() (deadline time.Time, ok bool) //返回context.Context被取消的时间,也就是完成工作的截止日期;
Done() <-chan struct{} //返回一个Channel,这个Channel会在当前工作完成或者上下文被取消之后关闭,多次调用Done方法返回同一个Channel;
Err() error //返回context.Context结束的原因,它只会在Done返回的Channel被关闭时才会返回非空的值:a)如果context.Context被取消,会返回Canceled错误;b)如果context.Context超时,会返回DeadlineExceeded错误。
Value(key interface{}) interface{} //从context.Context中获取键对应的值
}
context包中提供的 context.Background
、context.TODO
、context.WithDeadline
和 context.WithValue
函数会返回实现该接口的私有结构体
设计原理
使用 Context 同步信号
我们可以通过一个代码片段了解 context.Context
是如何对信号进行同步的。在这段代码中,我们创建了一个过期时间为 1s 的上下文,并向上下文传入 handle
函数,该方法会使用 500ms 的时间处理传入的『请求』:
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) //1秒之后会往ctx通道推入一个错误信息
defer cancel()
go handle(ctx, 500*time.Millisecond)
select {
case <-ctx.Done(): // ctx.Done()返回通道,等待通道信息
fmt.Println("main", ctx.Err())
}
}
func handle(ctx context.Context, duration time.Duration) {
select {
case <-ctx.Done():
fmt.Println("handle", ctx.Err())
case <-time.After(duration):
fmt.Println("process request with", duration)
}
}
因为过期时间大于处理时间,所以我们有足够的时间处理该『请求』,运行上述代码会打印出如下所示的内容:
$ go run context.go
process request with 500ms
main context deadline exceeded
handle
函数没有进入超时的 select
分支,但是 main
函数的 select
却会等待 context.Context
的超时并打印出 main context deadline exceeded
。
取消信号
context.WithCancel
函数能够从 context.Context
中衍生出一个新的子上下文并返回用于取消该上下文的函数(CancelFunc)。一旦我们执行返回的取消函数,当前上下文以及它的子上下文都会被取消,所有的 Goroutine 都会同步收到这一取消信号。
我们直接从 context.WithCancel
函数的实现来看它到底做了什么:
func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {
c := newCancelCtx(parent)
propagateCancel(parent, &c)
return &c, func() { c.cancel(true, Canceled) }
}
-
context.newCancelCtx
将传入的上下文包装成私有结构体context.cancelCtx
; -
context.propagateCancel
会构建父子上下文之间的关联,当父上下文被取消时,子上下文也会被取消:
其中cancaler是一个接口,ontext.cancelCtx
实现了对应的方法
func propagateCancel(parent Context, child canceler) {
done := parent.Done()
if done == nil { //当 parent.Done() == nil,也就是 parent 不会触发取消事件时,当前函数会直接返回;
return
}
select {
case <-done:
child.cancel(false, parent.Err()) // 父上下文已经被取消
return
default:
}
if p, ok := parentCancelCtx(parent); ok { //判断parent是否是带cancel的上下文
p.mu.Lock()
if p.err != nil { //如果已经被取消,child 会立刻被取消;
child.cancel(false, p.err)
} else {
p.children[child] = struct{}{} //如果没有被取消,child 会被加入 parent 的 children 列表中,等待 parent 释放取消信号
}
p.mu.Unlock()
} else {
go func() {
select {
case <-parent.Done():
child.cancel(false, parent.Err())
case <-child.Done():
}
}()
}
}
context.propagateCancel
的作用是在 parent
和 child
之间同步取消和结束的信号,保证在 parent
被取消时,child
也会收到对应的信号,不会发生状态不一致的问题。
context.cancelCtx
实现的几个接口方法也没有太多值得分析的地方,该结构体最重要的方法是 cancel
,这个方法会关闭上下文中的 Channel 并向所有的子上下文同步取消信号:
func (c *cancelCtx) cancel(removeFromParent bool, err error) {
c.mu.Lock()
if c.err != nil {
c.mu.Unlock()
return
}
c.err = err
if c.done == nil {
c.done = closedchan
} else {
close(c.done)
}
for child := range c.children {
child.cancel(false, err)
}
c.children = nil
c.mu.Unlock()
if removeFromParent {
removeChild(c.Context, c)
}
}
除了 context.WithCancel
之外,context
包中的另外两个函数 context.WithDeadline
和 context.WithTimeout
也都能创建可以被取消的计时器上下文 context.timerCtx
:
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc) {
return WithDeadline(parent, time.Now().Add(timeout))
}
func WithDeadline(parent Context, d time.Time) (Context, CancelFunc) {
if cur, ok := parent.Deadline(); ok && cur.Before(d) {
return WithCancel(parent)
}
c := &timerCtx{
cancelCtx: newCancelCtx(parent),
deadline: d,
}
propagateCancel(parent, c)
dur := time.Until(d)
if dur <= 0 {
c.cancel(true, DeadlineExceeded) // 已经过了截止日期
return c, func() { c.cancel(false, Canceled) }
}
c.mu.Lock()
defer c.mu.Unlock()
if c.err == nil {
c.timer = time.AfterFunc(dur, func() {
c.cancel(true, DeadlineExceeded)
})
}
return c, func() { c.cancel(true, Canceled) }
}
context.WithDeadline
方法在创建 context.timerCtx
的过程中,判断了父上下文的截止日期与当前日期,并通过 time.AfterFunc
创建定时器,当时间超过了截止日期后会调用 context.timerCtx.cancel
方法同步取消信号。
context.timerCtx
结构体内部不仅通过嵌入了context.cancelCtx
结构体继承了相关的变量和方法,还通过持有的定时器 timer
和截止时间 deadline
实现了定时取消这一功能:
type timerCtx struct {
cancelCtx
timer *time.Timer // Under cancelCtx.mu.
deadline time.Time
}
func (c *timerCtx) Deadline() (deadline time.Time, ok bool) {
return c.deadline, true
}
func (c *timerCtx) cancel(removeFromParent bool, err error) {
c.cancelCtx.cancel(false, err)
if removeFromParent {
removeChild(c.cancelCtx.Context, c)
}
c.mu.Lock()
if c.timer != nil {
c.timer.Stop()
c.timer = nil
}
c.mu.Unlock()
}
context.timerCtx.cancel
方法不仅调用了 context.cancelCtx.cancel
,还会停止持有的定时器减少不必要的资源浪费。
同步原语与锁
Go 语言作为一个原生支持用户态进程(Goroutine)的语言,当提到并发编程、多线程编程时,往往都离不开锁这一概念。锁是一种并发编程中的同步原语(Synchronization Primitives),它能保证多个 Goroutine 在访问同一片内存时不会出现竞争条件(Race condition)等问题。
Go 语言在 sync
包中提供了用于同步的一些基本原语,包括常见的 sync.Mutex
、sync.RWMutex
、sync.WaitGroup
、sync.Once
和 sync.Cond
:
这些基本原语提高了较为基础的同步功能,但是它们是一种相对原始的同步机制,在多数情况下,我们都应该使用抽象层级的更高的 Channel 实现同步。
Mutex
Go 语言的 sync.Mutex
由两个字段 state
和 sema
组成。其中 state
表示当前互斥锁的状态,而 sema
是用于控制锁状态的信号量。
type Mutex struct {
state int32
sema uint32
}
上述两个加起来只占 8 字节空间的结构体表示了 Go 语言中的互斥锁,非常轻量级。
状态
互斥锁的状态比较复杂,如下图所示,最低三位分别表示 mutexLocked
、mutexWoken
和 mutexStarving
,剩下的位置用来表示当前有多少个 Goroutine 等待互斥锁的释放:
在默认情况下,互斥锁的所有状态位都是
0
,int32
中的不同位分别表示了不同的状态:
-
mutexLocked
— 表示互斥锁的锁定状态; -
mutexWoken
— 表示从正常模式被从唤醒; -
mutexStarving
— 当前的互斥锁进入饥饿状态; -
waitersCount
— 当前互斥锁上等待的 Goroutine 个数;
正常模式和饥饿模式
sync.Mutex
有两种模式 — 正常模式和饥饿模式。我们需要在这里先了解正常模式和饥饿模式都是什么,它们有什么样的关系。
在正常模式下,锁的等待者会按照先进先出的顺序获取锁。但是刚被唤起的 Goroutine 与新创建的 Goroutine 竞争时,大概率会获取不到锁,为了减少这种情况的出现,一旦 Goroutine 超过 1ms 没有获取到锁,它就会将当前互斥锁切换饥饿模式,防止部分 Goroutine 被『饿死』。
饥饿模式是在 Go 语言 1.9 版本引入的优化,引入的目的是保证互斥锁的公平性(Fairness)。
在饥饿模式中,互斥锁会直接交给等待队列最前面的 Goroutine。新的 Goroutine 在该状态下不能获取锁、也不会进入自旋状态,它们只会在队列的末尾等待。如果一个 Goroutine 获得了互斥锁并且它在队列的末尾或者它等待的时间少于 1ms,那么当前的互斥锁就会被切换回正常模式。
相比于饥饿模式,正常模式下的互斥锁能够提供更好地性能,饥饿模式的能避免 Goroutine 由于陷入等待无法获取锁而造成的高尾延时。
加锁和解锁
互斥锁的加锁和解锁过程使用 sync.Mutex.Lock
和 sync.Mutex.Unlock
方法。
互斥锁的加锁是靠 sync.Mutex.Lock
完成的,最新的 Go 语言源代码中已经将 sync.Mutex.Lock
方法进行了简化,方法的主干只保留最常见、简单的情况 — 当锁的状态是 0 时,将 mutexLocked
位置成 1:
func (m *Mutex) Lock() {
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
return
}
m.lockSlow()
}
如果互斥锁的状态不是 0 时就会调用 sync.Mutex.lockSlow
尝试通过自旋(Spinnig)等方式等待锁的释放,该方法的主体是一个非常大 for 循环,这里将该方法分成几个部分介绍获取锁的过程:
- 判断当前 Goroutine 能否进入自旋;
- 通过自旋等待互斥锁的释放;
- 计算互斥锁的最新状态;
- 更新互斥锁的状态并获取锁;
我们先来介绍互斥锁是如何判断当前 Goroutine 能否进入自旋等互斥锁的释放:
func (m *Mutex) lockSlow() {
var waitStartTime int64
starving := false
awoke := false
iter := 0
old := m.state
for {
//互斥锁只有在普通模式才能进入自旋。
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) { //runtime_canSpin为true条件:1)运行在多 CPU 的机器上;2)当前 Goroutine 为了获取该锁进入自旋的次数小于四次;3)当前机器上至少存在一个正在运行的处理器 P 并且处理的运行队列为空;
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
runtime_doSpin() // 自旋,该指令只会占用 CPU 并消耗 CPU 时间。
iter++
old = m.state
continue
}
......
自旋是一种多线程同步机制,当前的进程在进入自旋的过程中会一直保持 CPU 的占用,持续检查某个条件是否为真。在多核的 CPU 上,自旋可以避免 Goroutine 的切换,使用恰当会对性能带来很大的增益,但是使用的不恰当就会拖慢整个程序。
一旦当前 Goroutine 能够进入自旋就会调用sync.runtime_doSpin
和 runtime.procyield
并执行 30 次的 PAUSE
指令,该指令只会占用 CPU 并消耗 CPU 时间。
处理了自旋相关的特殊逻辑之后,互斥锁会根据上下文计算当前互斥锁最新的状态。几个不同的条件分别会更新 state
字段中存储的不同信息 — mutexLocked
、mutexStarving
、mutexWoken
和 mutexWaiterShift
。计算了新的互斥锁状态之后,就会使用 CAS 函数更新状态。
如果我们没有通过 CAS 获得锁,会调用 sync.runtime_SemacquireMutex
使用信号量(关于信号量实现进程/线程之间通信原理可以自行搜索,此处不深入讲解)保证资源不会被两个 Goroutine 获取。sync.runtime_SemacquireMutex
会在方法中不断调用尝试获取锁并休眠当前 Goroutine 等待信号量的释放,一旦当前 Goroutine 可以获取信号量,它就会立刻返回,sync.Mutex.Lock
方法的剩余代码也会继续执行。
- 在正常模式下,这段代码会设置唤醒和饥饿标记、重置迭代次数并重新执行获取锁的循环;
- 在饥饿模式下,当前 Goroutine 会获得互斥锁,如果等待队列中只存在当前 Goroutine,互斥锁还会从饥饿模式中退出;
互斥锁的解锁过程 sync.Mutex.Unlock
与加锁过程相比就很简单,该过程会先使用 AddInt32
函数快速解锁,这时会发生下面的两种情况:
- 如果该函数返回的新状态等于 0,当前 Goroutine 就成功解锁了互斥锁;
- 如果该函数返回的新状态不等于 0,这段代码会调用
sync.Mutex.unlockSlow
方法开始慢速解锁:
func (m *Mutex) Unlock() {
new := atomic.AddInt32(&m.state, -mutexLocked)
if new != 0 {
m.unlockSlow(new)
}
}
sync.Mutex.unlockSlow
方法首先会校验锁状态的合法性 — 如果当前互斥锁已经被解锁过了就会直接抛出异常 sync: unlock of unlocked mutex
中止当前程序。
在正常情况下会根据当前互斥锁的状态,分别处理正常模式和饥饿模式下的互斥锁:
func (m *Mutex) unlockSlow(new int32) {
if (new+mutexLocked)&mutexLocked == 0 {
throw("sync: unlock of unlocked mutex")
}
if new&mutexStarving == 0 { // 正常模式
old := new
for {
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
runtime_Semrelease(&m.sema, false, 1)
return
}
old = m.state
}
} else { // 饥饿模式
runtime_Semrelease(&m.sema, true, 1)
}
}
- 在正常模式下,这段代码会分别处理以下两种情况处理;
- 如果互斥锁不存在等待者或者互斥锁的
mutexLocked
、mutexStarving
、mutexWoken
状态不都为 0,那么当前方法就可以直接返回,不需要唤醒其他等待者; - 如果互斥锁存在等待者,会通过
sync.runtime_Semrelease
唤醒等待者并移交锁的所有权;
- 如果互斥锁不存在等待者或者互斥锁的
- 在饥饿模式下,上述代码会直接调用
sync.runtime_Semrelease
方法将当前锁交给下一个正在尝试获取锁的等待者,等待者被唤醒后会得到锁,在这时互斥锁还不会退出饥饿状态;
小结
我们已经从多个方面分析了互斥锁 sync.Mutex
的实现原理,在这里我们从加锁和解锁两个方面总结一下结论和注意事项。
互斥锁的加锁过程比较复杂,它涉及自旋、信号量以及调度等概念:
- 如果互斥锁处于初始化状态,就会直接通过置位
mutexLocked
加锁; - 如果互斥锁处于
mutexLocked
并且在普通模式下工作,就会进入自旋,执行 30 次PAUSE
指令消耗 CPU 时间等待锁的释放; - 如果当前 Goroutine 等待锁的时间超过了 1ms,互斥锁就会切换到饥饿模式;
- 互斥锁在正常情况下会通过
sync.runtime_SemacquireMutex
函数将尝试获取锁的 Goroutine 切换至休眠状态,等待锁的持有者唤醒当前 Goroutine; - 如果当前 Goroutine 是互斥锁上的最后一个等待的协程或者等待的时间小于 1ms,当前 Goroutine 会将互斥锁切换回正常模式;
互斥锁的解锁过程与之相比就比较简单,其代码行数不多、逻辑清晰,也比较容易理解:
- 当互斥锁已经被解锁时,那么调用
sync.Mutex.Unlock
会直接抛出异常; - 当互斥锁处于饥饿模式时,会直接将锁的所有权交给队列中的下一个等待者,等待者会负责设置
mutexLocked
标志位; - 当互斥锁处于普通模式时,如果没有 Goroutine 等待锁的释放或者已经有被唤醒的 Goroutine 获得了锁,就会直接返回;在其他情况下会通过
sync.runtime_Semrelease
唤醒对应的 Goroutine;
RWMutex
读写互斥锁 sync.RWMutex
是细粒度的互斥锁,它不限制资源的并发读,但是读写、写写操作无法并行执行。
结构体
sync.RWMutex
中总共包含以下 5 个字段:
type RWMutex struct {
w Mutex
writerSem uint32
readerSem uint32
readerCount int32
readerWait int32
}
-
w
— 复用互斥锁提供的能力; -
writerSem
和readerSem
— 分别用于写等待读和读等待写: -
readerCount
存储了当前正在执行的读操作的数量; -
readerWait
表示当写操作被阻塞时等待的读操作个数;
我们会依次分析获取写锁和读锁的实现原理,其中:
- 写操作使用
sync.RWMutex.Lock
和sync.RWMutex.Unlock
方法; - 读操作使用
sync.RWMutex.RLock
和sync.RWMutex.RUnlock
方法;
写锁
当资源的使用者想要获取写锁时,需要调用 sync.RWMutex.Lock
方法:
func (rw *RWMutex) Lock() {
rw.w.Lock()
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
runtime_SemacquireMutex(&rw.writerSem, false, 0)
}
}
- 调用结构体持有的
sync.Mutex
的sync.Mutex.Lock
方法阻塞后续的写操作;- 因为互斥锁已经被获取,其他 Goroutine 在获取写锁时就会进入自旋或者休眠;
- 调用
atomic.AddInt32
方法阻塞后续的读操作: - 如果仍然有其他 Goroutine 持有互斥锁的读锁(r != 0),该 Goroutine 会调用
sync.runtime_SemacquireMutex
进入休眠状态等待所有读锁所有者执行结束后释放writerSem
信号量将当前协程唤醒。
写锁的释放会调用 sync.RWMutex.Unlock
方法:
func (rw *RWMutex) Unlock() {
r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
if r >= rwmutexMaxReaders {
throw("sync: Unlock of unlocked RWMutex")
}
for i := 0; i < int(r); i++ {
runtime_Semrelease(&rw.readerSem, false, 0)
}
rw.w.Unlock()
}
与加锁的过程正好相反,写锁的释放分以下几个执行:
- 调用
atomic.AddInt32
函数将变回正数,释放读锁; - 通过 for 循环触发所有由于获取读锁而陷入等待的 Goroutine:
- 调用
sync.Mutex.Unlock
方法释放写锁;
获取写锁时会先阻塞写锁的获取,后阻塞读锁的获取,这种策略能够保证读操作不会被连续的写操作『饿死』。
读锁
读锁的加锁方法 sync.RWMutex.RLock
很简单,该方法会通过 atomic.AddInt32
将 readerCount
加一:
func (rw *RWMutex) RLock() {
if atomic.AddInt32(&rw.readerCount, 1) < 0 {
runtime_SemacquireMutex(&rw.readerSem, false, 0)
}
}
- 如果该方法返回负数 — 其他 Goroutine 获得了写锁,当前 Goroutine 就会调用
sync.runtime_SemacquireMutex
陷入休眠等待锁的释放; - 如果该方法的结果为非负数 — 没有 Goroutine 获得写锁,当前方法就会成功返回;
当 Goroutine 想要释放读锁时,会调用如下所示的 sync.RWMutex.RUnlock
方法:
func (rw *RWMutex) RUnlock() {
if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
rw.rUnlockSlow(r)
}
}
该方法会先减少正在读资源的 readerCount
整数,根据 atomic.AddInt32
的返回值不同会分别进行处理:
- 如果返回值大于等于零 — 读锁直接解锁成功;
- 如果返回值小于零 — 有一个正在执行的写操作,在这时会调用
sync.RWMutex.rUnlockSlow
方法;
func (rw *RWMutex) rUnlockSlow(r int32) {
if r+1 == 0 || r+1 == -rwmutexMaxReaders {
throw("sync: RUnlock of unlocked RWMutex")
}
if atomic.AddInt32(&rw.readerWait, -1) == 0 {
runtime_Semrelease(&rw.writerSem, false, 1)
}
}
sync.RWMutex.rUnlockSlow
会减少获取锁的写操作等待的读操作数 readerWait
并在所有读操作都被释放之后触发写操作的信号量 writerSem
,该信号量被触发时,调度器就会唤醒尝试获取写锁的 Goroutine。
小结
读写互斥锁 sync.RWMutex
虽然提供的功能非常复杂,不过因为它建立在 sync.Mutex
上,所以整体的实现上会简单很多。我们总结一下读锁和写锁的关系:
- 调用
sync.RWMutex.Lock
尝试获取写锁时;- 每次
sync.RWMutex.RUnlock
都会将readerWait
其减一,当它归零时该 Goroutine 就会获得写锁; - 将
readerCount
减少rwmutexMaxReaders
个数以阻塞后续的读操作;
- 每次
- 调用
sync.RWMutex.Unlock
释放写锁时,会先通知所有的读操作,然后才会释放持有的互斥锁;
读写互斥锁在互斥锁之上提供了额外的更细粒度的控制,能够在读操作远远多于写操作时提升性能。
WaitGroup
sync.WaitGroup
可以等待一组 Goroutine 的返回,一个比较常见的使用场景是批量发出 RPC 或者 HTTP 请求:
requests := []*Request{...}
wg := &sync.WaitGroup{}
wg.Add(len(requests))
for _, request := range requests {
go func(r *Request) {
defer wg.Done()
// res, err := service.call(r)
}(request)
}
wg.Wait()
我们可以通过 sync.WaitGroup
将原本顺序执行的代码在多个 Goroutine 中并发执行,加快程序处理的速度。
结构体
sync.WaitGroup
结构体中的成员变量非常简单,其中只包含两个成员变量:
type WaitGroup struct {
noCopy noCopy
state1 [3]uint32
}
-
noCopy
— 保证sync.WaitGroup
不会被开发者通过再赋值的方式拷贝; -
state1
— 存储着状态和信号量;
sync.noCopy
是一个特殊的私有结构体,tools/go/analysis/passes/copylock
包中的分析器会在编译期间检查被拷贝的变量中是否包含 sync.noCopy
结构体,如果包含该结构体就会在运行时报错。
sync.WaitGroup
` 结构体中还包含一个总共占用 12 字节(8 字节用户存储状态,高32位及state1[1]代表counter,低32位state1[0]是等待gorutine数,另外32位state1[2]是信号量)的数组,这个数组会存储当前结构体的状态。
sync.WaitGroup
提供的私有方法 sync.WaitGroup.state
能够帮我们从 state1
字段中取出它的状态和信号量。
接口
sync.WaitGroup
对外暴露了三个方法 — sync.WaitGroup.Add
、sync.WaitGroup.Wait
和 sync.WaitGroup.Done
。
因为其中的 sync.WaitGroup.Done
只是向 sync.WaitGroup.Add
方法传入了 -1,所以我们重点分析另外两个方法 sync.WaitGroup.Add
和 sync.WaitGroup.Wait
:
func (wg *WaitGroup) Add(delta int) {
statep, semap := wg.state()
state := atomic.AddUint64(statep, uint64(delta)<<32)
v := int32(state >> 32)
w := uint32(state)
if v < 0 {
panic("sync: negative WaitGroup counter")
}
if v > 0 || w == 0 {
return
}
*statep = 0
for ; w != 0; w-- {
runtime_Semrelease(semap, false, 0)
}
}
sync.WaitGroup.Add
方法可以更新 sync.WaitGroup
中的计数器 counter
。虽然 sync.WaitGroup.Add
方法传入的参数可以为负数,但是计数器只能是非负数,一旦出现负数就会发生程序崩溃。当调用计数器归零,也就是所有任务都执行完成时,就会通过 sync.runtime_Semrelease
唤醒处于等待状态的所有 Goroutine。
sync.WaitGroup
的另一个方法 sync.WaitGroup.Wait
会在计数器大于 0 并且不存在等待的 Goroutine 时,调用 sync.runtime_Semacquire
陷入睡眠状态。
func (wg *WaitGroup) Wait() {
statep, semap := wg.state()
for {
state := atomic.LoadUint64(statep)
v := int32(state >> 32)
if v == 0 {
return
}
if atomic.CompareAndSwapUint64(statep, state, state+1) {
runtime_Semacquire(semap)
if +statep != 0 {
panic("sync: WaitGroup is reused before previous Wait has returned")
}
return
}
}
}
当 sync.WaitGroup
的计数器归零时,当陷入睡眠状态的 Goroutine 就被唤醒,上述方法会立刻返回。
小结
通过对 sync.WaitGroup
的分析和研究,我们能够得出以下结论:
-
sync.WaitGroup
必须在sync.WaitGroup.Wait
方法返回之后才能被重新使用; -
sync.WaitGroup.Done
只是对sync.WaitGroup.Add
方法的简单封装,我们可以向sync.WaitGroup.Add
方法传入任意负数(需要保证计数器非负)快速将计数器归零以唤醒其他等待的 Goroutine; - 可以同时有多个 Goroutine 等待当前
sync.WaitGroup
计数器的归零,这些 Goroutine 会被同时唤醒;
Once
Go 语言标准库中 sync.Once
可以保证在 Go 程序运行期间的某段代码只会执行一次。在运行如下所示的代码时,我们会看到如下所示的运行结果:
func main() {
o := &sync.Once{}
for i := 0; i < 10; i++ {
o.Do(func() {
fmt.Println("only once")
})
}
}
$ go run main.go
only once
结构体
每一个 sync.Once
结构体中都只包含一个用于标识代码块是否执行过的 done
以及一个互斥锁 sync.Mutex
:
type Once struct {
done uint32
m Mutex
}
接口
sync.Once.Do
是 sync.Once
结构体对外唯一暴露的方法,该方法会接收一个入参为空的函数:
- 如果传入的函数已经执行过,就会直接返回;
- 如果传入的函数没有执行过,就会调用
sync.Once.doSlow
执行传入的函数:
func (o *Once) Do(f func()) {
if atomic.LoadUint32(&o.done) == 0 {
o.doSlow(f)
}
}
func (o *Once) doSlow(f func()) {
o.m.Lock()
defer o.m.Unlock()
if o.done == 0 {
defer atomic.StoreUint32(&o.done, 1)
f()
}
}
- 为当前 Goroutine 获取互斥锁;
- 执行传入的无入参函数;
- 运行延迟函数调用,将成员变量
done
更新成 1;
sync.Once
就会通过成员变量 done
确保函数不会执行第二次。
小结
作为用于保证函数执行次数的 sync.Once
结构体,它使用互斥锁和 sync/atomic
包提供的方法实现了某个函数在程序运行期间只能执行一次的语义。在使用该结构体时,我们也需要注意以下的问题:
-
sync.Once.Do
方法中传入的函数只会被执行一次,哪怕函数中发生了panic
; - 两次调用
sync.Once.Do
方法传入不同的函数也只会执行第一次调用的函数;
Cond
Go 语言标准库中的 sync.Cond
一个条件变量,它可以让一系列的 Goroutine 都在满足特定条件时被唤醒。每一个 sync.Cond
结构体在初始化时都需要传入一个互斥锁,我们可以通过下面的例子了解它的使用方法:
func main() {
c := sync.NewCond(&sync.Mutex{})
for i := 0; i < 10; i++ {
go listen(c)
}
time.Sleep(1*time.Second)
go broadcast(c)
ch := make(chan os.Signal, 1)
signal.Notify(ch, os.Interrupt)
<-ch
}
func broadcast(c *sync.Cond) {
c.L.Lock()
c.Broadcast() //唤起c等待gorutine队列中所有的gorutine
c.L.Unlock()
}
func listen(c *sync.Cond) {
c.L.Lock()
c.Wait() //阻塞等待信号
fmt.Println("listen")
c.L.Unlock()
}
$ go run main.go
listen
...
listen
上述代码同时运行了 11 个 Goroutine,这 11 个 Goroutine 分别做了不同事情:
- 10 个 Goroutine 通过
sync.Cond.Wait
等待特定条件的满足; - 1 个 Goroutine 会调用
sync.Cond.Broadcast
方法通知所有陷入等待的 Goroutine;
结构体
sync.Cond
的结构体中包含以下 4 个字段:
type Cond struct {
noCopy noCopy //用于保证结构体不会在编译期间拷贝
L Locker //用于保护内部的 `notify` 字段,`Locker` 接口类型的变量
notify notifyList // 一个 Goroutine 的链表,它是实现同步机制的核心结构
checker copyChecker //用于禁止运行期间发生的拷贝
}
type notifyList struct {
wait uint32 //表示当前正在等待的 Goroutine
notify uint32 //表示当前已经通知到的 Goroutine
lock mutex
head *sudog //指向的链表的头
tail *sudog //指向的链表的尾
}
接口
sync.Cond
对外暴露的 sync.Cond.Wait
方法会将当前 Goroutine 陷入休眠状态,它的执行过程如下:
func (c *Cond) Wait() {
c.checker.check()
t := runtime_notifyListAdd(&c.notify) // runtime.notifyListAdd 的链接名。将等待计数器加一并解锁
c.L.Unlock()
runtime_notifyListWait(&c.notify, t) // runtime.notifyListWait 的链接名。 等待其他 Goroutine 的唤醒并加锁
c.L.Lock()
}
func notifyListAdd(l *notifyList) uint32 {
return atomic.Xadd(&l.wait, 1) - 1
}
runtime.notifyListWait
函数会获取当前 Goroutine 并将它追加到 Goroutine 通知链表的最末端:
func notifyListWait(l *notifyList, t uint32) {
s := acquireSudog()
s.g = getg()
s.ticket = t
if l.tail == nil {
l.head = s
} else {
l.tail.next = s
}
l.tail = s
goparkunlock(&l.lock, waitReasonSyncCondWait, traceEvGoBlockCond, 3)
releaseSudog(s)
}
除了将当前 Goroutine 追加到链表的末端之外,我们还会调用 runtime.goparkunlock
将当前 Goroutine 陷入休眠状态,该函数也是在 Go 语言切换 Goroutine 时经常会使用的方法,它会直接让出当前处理器的使用权并等待调度器的唤醒。
sync.Cond.Signal
和 sync.Cond.Broadcast
方法就是用来唤醒调用 sync.Cond.Wait
陷入休眠的 Goroutine,它们两个的实现有一些细微差别:
-
sync.Cond.Signal
方法会唤醒队列最前面的 Goroutine; -
sync.Cond.Broadcast
方法会唤醒队列中全部的 Goroutine;
func (c *Cond) Signal() {
c.checker.check()
runtime_notifyListNotifyOne(&c.notify)
}
func (c *Cond) Broadcast() {
c.checker.check()
runtime_notifyListNotifyAll(&c.notify)
}
runtime.notifyListNotifyOne
函数只会从 sync.notifyList
链表中找到满足 sudog.ticket == l.notify
条件的 Goroutine 并通过 readyWithTime
唤醒:
func notifyListNotifyOne(l *notifyList) {
t := l.notify
atomic.Store(&l.notify, t+1)
for p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next {
if s.ticket == t {
n := s.next
if p != nil {
p.next = n
} else {
l.head = n
}
if n == nil {
l.tail = p
}
s.next = nil
readyWithTime(s, 4)
return
}
}
}
runtime.notifyListNotifyAll
会依次通过 runtime.readyWithTime
函数唤醒链表中 Goroutine:
func notifyListNotifyAll(l *notifyList) {
s := l.head
l.head = nil
l.tail = nil
atomic.Store(&l.notify, atomic.Load(&l.wait))
for s != nil {
next := s.next
s.next = nil
readyWithTime(s, 4)
s = next
}
}
Goroutine 的唤醒顺序也是按照加入队列的先后顺序,先加入的会先被唤醒,而后加入的 Goroutine 需要等待调度器的调度。
在一般情况下,我们都会先调用 sync.Cond.Wait
陷入休眠等待满足期望条件,当满足唤醒条件时,就可以选择使用 sync.Cond.Signal
或者 sync.Cond.Broadcast
唤醒一个或者全部的 Goroutine。
小结
sync.Cond
不是一个常用的同步机制,在遇到长时间条件无法满足时,与使用 for {}
进行忙碌等待相比,sync.Cond
能够让出处理器的使用权。在使用的过程中我们需要注意以下问题:
-
sync.Cond.Wait
方法在调用之前一定要使用获取互斥锁,否则会触发程序崩溃; -
sync.Cond.Signal
方法唤醒的 Goroutine 都是队列最前面、等待最久的 Goroutine; -
sync.Cond.Broadcast
会按照一定顺序广播通知等待的全部 Goroutine;
扩展原语
除了标准库中提供的同步原语之外,Go 语言还在子仓库 sync 中提供了四种扩展原语,x/sync/errgroup.Group
、x/sync/semaphore.Weighted
、x/sync/singleflight.Group
和 x/sync/syncmap.Map
,其中的 x/sync/syncmap.Map
在 1.9 版本中被移植到了标准库中。
下面简单介绍 Go 语言在扩展包中提供的三种同步原语,也就是 x/sync/errgroup.Group
、x/sync/semaphore.Weighted
和 x/sync/singleflight.Group
。
ErrGroup
x/sync/errgroup.Group
就为我们在一组 Goroutine 中提供了同步、错误传播以及上下文取消的功能。
x/sync/errgroup.Group.Go
方法能够创建一个 Goroutine 并在其中执行传入的函数,而 x/sync/errgroup.Group.Wait
会等待所有 Goroutine 全部返回,该方法的不同返回结果也有不同的含义:
- 如果返回错误 — 这一组 Goroutine 最少返回一个错误;
- 如果返回空值 — 所有 Goroutine 都成功执行;
结构体
x/sync/errgroup.Group
结构体组成如下:
type Group struct {
cancel func() //创建context.Context时返回的取消函数,用于在多个 Goroutine 之间同步取消信号;
wg sync.WaitGroup //用于等待一组 Goroutine 完成子任务的同步原语
errOnce sync.Once //用于保证只接收一个子任务返回的错误
err error
}
这些字段共同组成了 x/sync/errgroup.Group
结构体并为我们提供同步、错误传播以及上下文取消等功能。
x/sync/errgroup.Group
的实现没有涉及底层和运行时包中的 API,它只是对基本同步语义进行了封装以提供更加复杂的功能。在使用时,我们也需要注意以下的几个问题:
-
x/sync/errgroup.Group
在出现错误或者等待结束后都会调用context.Context
的cancel
方法同步取消信号; - 只有第一个出现的错误才会被返回,剩余的错误都会被直接抛弃;
Semaphore
信号量是在并发编程中常见的一种同步机制,在需要控制访问资源的进程数量时就会用到信号量,它会保证持有的计数器在 0 到初始化的权重之间波动。
- 每次获取资源时都会将信号量中的计数器减去对应的数值,在释放时重新加回来;
- 当遇到计数器大于信号量大小时就会进入休眠等待其他线程释放信号;
Go 语言的扩展包中就提供了带权重的信号量 x/sync/semaphore.Weighted
,我们可以按照不同的权重对资源的访问进行管理,这个结构体对外也只暴露了四个方法:
-
x/sync/semaphore.NewWeighted
用于创建新的信号量; -
x/sync/semaphore.Weighted.Acquire
阻塞地获取指定权重的资源,如果当前没有空闲资源,就会陷入休眠等待; -
x/sync/semaphore.Weighted.TryAcquire
非阻塞地获取指定权重的资源,如果当前没有空闲资源,就会直接返回false
; -
x/sync/semaphore.Weighted.Release
用于释放指定权重的资源;
结构体
x/sync/semaphore.NewWeighted
方法能根据传入的信号量最大权重创建一个 x/sync/semaphore.Weighted
结构体指针:
func NewWeighted(n int64) *Weighted {
w := &Weighted{size: n}
return w
}
type Weighted struct {
size int64
cur int64
mu sync.Mutex
waiters list.List
}
x/sync/semaphore.Weighted
结构体中包含一个 waiters
列表,其中存储着等待获取资源的 Goroutine,除此之外它还包含当前信号量的上限以及一个计数器 cur
,这个计数器的范围就是 [0, size]:
信号量中的计数器会随着用户对资源的访问和释放进行改变,引入的权重概念能够提供更细粒度的资源的访问控制,尽可能满足常见的用例。
带权重的信号量确实有着更多的应用场景,这也是 Go 语言对外提供的唯一一种信号量实现,在使用的过程中我们需要注意以下的几个问题:
-
x/sync/semaphore.Weighted.Acquire
和x/sync/semaphore.Weighted.TryAcquire
方法都可以用于获取资源,前者会阻塞地获取信号量,后者会非阻塞地获取信号量; -
x/sync/semaphore.Weighted.Release
方法会按照 FIFO 的顺序唤醒可以被唤醒的 Goroutine; - 如果一个 Goroutine 获取了较多地资源,由于
x/sync/semaphore.Weighted.Release
的释放策略可能会等待比较长的时间;
SingleFlight
x/sync/singleflight.Group
是 Go 语言扩展包中提供了另一种同步原语,它能够在一个服务中抑制对下游的多次重复请求。一个比较常见的使用场景是 — 我们在使用 Redis 对数据库中的数据进行缓存,发生缓存击穿时,大量的流量都会打到数据库上进而影响服务的尾延时。
x/sync/singleflight.Group
能有效地解决这个问题,它能够限制对同一个 Key
的多次重复请求,减少对下游的瞬时流量。
在资源的获取非常昂贵时(例如:访问缓存、数据库),就很适合使用 x/sync/singleflight.Group
对服务进行优化。我们来了解一下它的使用方法:
type service struct {
requestGroup singleflight.Group
}
func (s *service) handleRequest(ctx context.Context, request Request) (Response, error) {
v, err, _ := requestGroup.Do(request.Hash(), func() (interface{}, error) {
rows, err := // select * from tables
if err != nil {
return nil, err
}
return rows, nil
})
if err != nil {
return nil, err
}
return Response{
rows: rows,
}, nil
}
因为请求的哈希在业务上一般表示相同的请求,所以上述代码使用它作为请求的键。当然,我们也可以选择其他的唯一字段作为 x/sync/singleflight.Group.Do
方法的第一个参数减少重复的请求。
结构体
x/sync/singleflight.Group
结构体由一个互斥锁sync.Mutex
和一个映射表组成,每一个 x/sync/singleflight.call
结构体都保存了当前调用对应的信息:
type Group struct {
mu sync.Mutex
m map[string]*call
}
type call struct {
wg sync.WaitGroup
val interface{}
err error
dups int
chans []chan<- Result
}
x/sync/singleflight.call
结构体中的 val
和 err
字段都只会在执行传入的函数时赋值一次并在 sync.WaitGroup.Wait
返回时被读取;dups
和 chans
两个字段分别存储了抑制的请求数量以及用于同步结果的 Channel。
当我们需要减少对下游的相同请求时,就可以使用 x/sync/singleflight.Group
来增加吞吐量和服务质量,不过在使用的过程中我们也需要注意以下的几个问题:
-
x/sync/singleflight.Group.Do
和x/sync/singleflight.Group.DoChan
一个用于同步阻塞调用传入的函数,一个用于异步调用传入的参数并通过 Channel 接收函数的返回值; -
x/sync/singleflight.Group.Forget
方法可以通知x/sync/singleflight.Group
在持有的映射表中删除某个键,接下来对该键的调用就不会等待前面的函数返回了; - 一旦调用的函数返回了错误,所有在等待的 Goroutine 也都会接收到同样的错误;
Channel
下面介绍管道 Channel 的设计原理、数据结构和常见操作,例如 Channel 的创建、发送、接收和关闭。作为 Go 核心的数据结构和 Goroutine 之间的通信方式,Channel 是支撑 Go 语言高性能并发编程模型的重要结构,我们首先需要了解 Channel 背后的设计原理以及它的底层数据结构。
设计原理
Go 语言中最常见的、也是经常被人提及的设计模式就是 — 不要通过共享内存的方式进行通信,而是应该通过通信的方式共享内存。在很多主流的编程语言中,多个线程传递数据的方式一般都是共享内存,为了解决线程冲突的问题,我们需要限制同一时间能够读写这些变量的线程数量,这与 Go 语言鼓励的方式并不相同。
虽然我们在 Go 语言中也能使用共享内存加互斥锁进行通信,但是 Go 语言提供了一种不同的并发模型,也就是通信顺序进程(Communicating sequential processes,CSP)。两个 Goroutine,一个会向 Channel 中发送数据,另一个会从 Channel 中接收数据。Channel 分为同步Channel和异步Channel:
- 同步 Channel — 不需要缓冲区,发送方会直接将数据交给(Handoff)接收方;
- 异步 Channel — 基于环形缓存的传统生产者消费者模型;
数据结构
Go 语言的 Channel 在运行时使用 runtime.hchan
结构体表示。我们在 Go 语言中创建新的 Channel 时,实际上创建的都是如下所示的结构体:
type hchan struct {
qcount uint //Channel 中的元素个数
dataqsiz uint //Channel 中的循环队列的长度
buf unsafe.Pointer //Channel 的缓冲区数据指针
elemsize uint16 //当前 Channel 能够收发的元素大小
closed uint32
elemtype *_type //当前 Channel 能够收发的元素类型
sendx uint //Channel 的发送操作处理到的位置
recvx uint //Channel 的接收操作处理到的位置
recvq waitq // 当前 Channel 由于缓冲区空间不足而阻塞的接收 Goroutine 列表
sendq waitq //当前 Channel 由于缓冲区空间不足而阻塞的发送 Goroutine 列表
lock mutex
}
等待队列使用双向链表 `runtime.waitq`
type waitq struct {
first *sudog
last *sudog
}
runtime.sudog
表示一个在等待列表中的 Goroutine,该结构体中存储了阻塞的相关信息以及两个分别指向前后 runtime.sudog
的指针。
创建管道
Go 语言中所有 Channel 的创建都会使用 make
关键字。编译器会将 make(chan int, 10)
表达式被转换成 OMAKE
类型的节点,并在类型检查阶段将 OMAKE
类型的节点转换成 OMAKECHAN
类型:
func typecheck1(n *Node, top int) (res *Node) {
switch n.Op {
case OMAKE:
...
switch t.Etype {
case TCHAN:
l = nil
if i < len(args) { // 带缓冲区的异步 Channel
...
n.Left = l
} else { // 不带缓冲区的同步 Channel
n.Left = nodintconst(0)
}
n.Op = OMAKECHAN
}
}
}
这一阶段会对传入 make
关键字的缓冲区大小进行检查,如果我们不向 make
传递表示缓冲区大小的参数,那么就会设置一个默认值 0,也就是当前的 Channel 不存在缓冲区。
OMAKECHAN
类型的节点最终都会在 SSA 中间代码生成阶段之前被转换成调用 runtime.makechan
或者 runtime.makechan64
的函数:
func walkexpr(n *Node, init *Nodes) *Node {
switch n.Op {
case OMAKECHAN:
size := n.Left
fnname := "makechan64"
argtype := types.Types[TINT64]
if size.Type.IsKind(TIDEAL) || maxintval[size.Type.Etype].Cmp(maxintval[TUINT]) <= 0 {
fnname = "makechan"
argtype = types.Types[TINT]
}
n = mkcall1(chanfn(fnname, 1, n.Type), n.Type, init, typename(n.Type), conv(size, argtype))
}
}
runtime.makechan
和 runtime.makechan64
会根据传入的参数类型和缓冲区大小创建一个新的 Channel 结构,其中后者用于处理缓冲区大小大于 2 的 32 次方的情况,我们重点关注 runtime.makechan
函数:
func makechan(t *chantype, size int) *hchan {
elem := t.elem
mem, _ := math.MulUintptr(elem.size, uintptr(size))
var c *hchan
switch {
case mem == 0: //当前 Channel 中不存在缓冲区,那么就只会为 runtime.hchan分配一段内存空间;
c = (*hchan)(mallocgc(hchanSize, nil, true))
c.buf = c.raceaddr()
case elem.kind&kindNoPointers != 0: //Channel 中存储的类型不是指针类型,就会为当前的 Channel 和底层的数组分配一块连续的内存空间;
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default: // 在默认情况下会单独为runtime.hchan和缓冲区分配内存
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
return c
}
在函数的最后会统一更新 runtime.hchan
的 elemsize
、elemtype
和 dataqsiz
几个字段。
发送数据
当我们想要向 Channel 发送数据时,就需要使用 ch <- i
语句,编译器会将它解析成 OSEND
节点并在 cmd/compile/internal/gc.walkexpr
函数中转换成 runtime.chansend1
:
func walkexpr(n *Node, init *Nodes) *Node {
switch n.Op {
case OSEND:
n1 := n.Right
n1 = assignconv(n1, n.Left.Type.Elem(), "chan send")
n1 = walkexpr(n1, init)
n1 = nod(OADDR, n1, nil)
n = mkcall1(chanfn("chansend1", 2, n.Left.Type), nil, init, n.Left, n1)
}
}
runtime.chansend1
只是调用了 runtime.chansend
并传入 Channel 和需要发送的数据。runtime.chansend
是向 Channel 中发送数据时最终会调用的函数,这个函数负责了发送数据的全部逻辑,如果我们在调用时将 block
参数设置成 true
,那么就表示当前发送操作是一个阻塞操作:
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
lock(&c.lock)
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
在发送数据的逻辑执行之前会先为当前 Channel 加锁,防止发生竞争条件。如果 Channel 已经关闭,那么向该 Channel 发送数据时就会报"send on closed channel"
错误并中止程序。
因为 runtime.chansend
函数的实现比较复杂,所以我们这里将该函数的执行过程分成以下的三个部分:
- 当存在等待的接收者时,通过
runtime.send
直接将数据发送给阻塞的接收者; - 当缓冲区存在空余空间时,将发送的数据写入 Channel 的缓冲区;
- 当不存在缓冲区或者缓冲区已满时,等待其他 Goroutine 从 Channel 接收数据;
直接发送
如果目标 Channel 没有被关闭并且已经有处于读等待的 Goroutine,那么 runtime.chansend
函数会从接收队列 recvq
中取出最先陷入等待的 Goroutine 并直接向它发送数据:
if sg := c.recvq.dequeue(); sg != nil {
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
发送数据时会调用 runtime.send
,该函数的执行可以分成两个部分:
- 调用
runtime.sendDirect
函数将发送的数据直接拷贝到x = <-c
表达式中变量x
所在的内存地址上; - 调用
runtime.goready
将等待接收数据的 Goroutine 标记成可运行状态Grunnable
并把该 Goroutine 放到发送方所在的处理器的runnext
上等待执行,该处理器在下一次调度时就会立刻唤醒数据的接收方;
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if sg.elem != nil {
sendDirect(c.elemtype, sg, ep)
sg.elem = nil
}
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
goready(gp, skip+1)
}
需要注意的是,发送数据的过程只是将接收方的 Goroutine 放到了处理器的 runnext
中,程序没有立刻执行该 Goroutine。
缓冲区
如果创建的 Channel 包含缓冲区并且 Channel 中的数据没有装满,就会执行下面这段代码:
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
...
if c.qcount < c.dataqsiz {
qp := chanbuf(c, c.sendx)
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}
...
}
在这里我们首先会使用 chanbuf
计算出下一个可以存储数据的位置,然后通过 runtime.typedmemmove
将发送的数据拷贝到缓冲区中并增加 sendx
索引和 qcount
计数器。
如果当前 Channel 的缓冲区未满,向 Channel 发送的数据会存储在 Channel 中 sendx
索引所在的位置并将 sendx
索引加一,由于这里的 buf
是一个循环数组,所以当 sendx
等于 dataqsiz
时就会重新回到数组开始的位置。
阻塞发送
当 Channel 没有接收者能够处理数据时,向 Channel 发送数据就会被下游阻塞,当然使用 select
关键字可以向 Channel 非阻塞地发送消息。向 Channel 阻塞地发送数据会执行下面的代码,我们可以简单梳理一下这段代码的逻辑:
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
...
if !block {
unlock(&c.lock)
return false
}
gp := getg() //获取发送数据使用的 Goroutine
mysg := acquireSudog() //函数获取 runtime.sudog结构体并设置这一次阻塞发送的相关信息,例如发送的 Channel、是否在 Select 控制结构中和待发送数据的内存地址等
mysg.elem = ep
mysg.g = gp
mysg.c = c
gp.waiting = mysg
c.sendq.enqueue(mysg) //将刚刚创建并初始化的 runtime.sudog加入发送等待队列,并设置到当前 Goroutine 的 `waiting` 上,表示 Goroutine 正在等待该 `sudog` 准备就绪
goparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend, 3) //将当前的 Goroutine 陷入沉睡等待唤醒
//被调度器唤醒后会执行一些收尾工作,将一些属性置零并且释放 untime.sudog结构体
gp.waiting = nil
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true //返回 `true` 表示这向 Channel 发送数据的结束
}
小结
我们在这里可以简单梳理和总结一下使用 ch <- i
表达式向 Channel 发送数据时遇到的几种情况:
- 如果当前 Channel 的
recvq
上存在已经被阻塞的 Goroutine,那么会直接将数据发送给当前的 Goroutine 并将其设置成下一个运行的 Goroutine; - 如果 Channel 存在缓冲区并且其中还有空闲的容量,我们就会直接将数据直接存储到当前缓冲区
sendx
所在的位置上; - 如果不满足上面的两种情况,就会创建一个
runtime.sudog
结构并将其加入 Channel 的sendq
队列中,当前 Goroutine 也会陷入阻塞等待其他的协程从 Channel 接收数据;
发送数据的过程中包含几个会触发 Goroutine 调度的时机:
- 发送数据时发现 Channel 上存在等待接收数据的 Goroutine,立刻设置处理器的
runnext
属性,但是并不会立刻触发调度; - 发送数据时并没有找到接收方并且缓冲区已经满了,这时就会将自己加入 Channel 的
sendq
队列并调用runtime.goparkunlock
触发 Goroutine 的调度让出处理器的使用权;
接收数据
我们接下来继续介绍 Channel 操作的另一方 — 数据的接收。Go 语言中可以使用两种不同的方式去接收 Channel 中的数据:
i <- ch
i, ok <- ch
这两种不同的方法经过编译器的处理都会变成 ORECV
类型的节点,后者会在类型检查阶段被转换成 OAS2RECV
类型。
虽然不同的接收方式会被转换成 runtime.chanrecv1
和 runtime.chanrecv2
两种不同函数的调用,但是这两个函数最终还是会调用 runtime.chanrecv
。
当我们从一个空 Channel 接收数据时会直接调用 runtime.gopark
直接让出处理器的使用权。
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
if c == nil {
if !block {
return
}
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw("unreachable")
}
lock(&c.lock)
if c.closed != 0 && c.qcount == 0 {
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
如果当前 Channel 已经被关闭并且缓冲区中不存在任何的数据,那么就会清除 ep
指针中的数据并立刻返回。
除了上述两种特殊情况,使用 runtime.chanrecv
从 Channel 接收数据时还包含以下三种不同情况:
- 当存在等待的发送者时,通过
runtime.recv
直接从阻塞的发送者或者缓冲区中获取数据; - 当缓冲区存在数据时,从 Channel 的缓冲区中接收数据;
- 当缓冲区中不存在数据时,等待其他 Goroutine 向 Channel 发送数据;
这三种情况和信息发送是一一对应的,此处不再赘述。
小结
我们梳理一下从 Channel 中接收数据时可能会发生的五种情况:
- 如果 Channel 为空,那么就会直接调用
runtime.gopark
挂起当前 Goroutine; - 如果 Channel 已经关闭并且缓冲区没有任何数据,
runtime.chanrecv
函数会直接返回; - 如果 Channel 的
sendq
队列中存在挂起的 Goroutine,就会将recvx
索引所在的数据拷贝到接收变量所在的内存空间上并将sendq
队列中 Goroutine 的数据拷贝到缓冲区; - 如果 Channel 的缓冲区中包含数据就会直接读取
recvx
索引对应的数据; - 在默认情况下会挂起当前的 Goroutine,将
runtime.sudog
结构加入recvq
队列并陷入休眠等待调度器的唤醒;
我们总结一下从 Channel 接收数据时,会触发 Goroutine 调度的两个时机:
- 当 Channel 为空时;
- 当缓冲区中不存在数据并且也不存在数据的发送者时;
关闭管道
编译器会将用于关闭管道的 close
关键字转换成 OCLOSE
节点以及 runtime.closechan
的函数调用。
当 Channel 是一个空指针或者已经被关闭时,Go 语言运行时都会直接 panic
并抛出异常:
func closechan(c *hchan) {
if c == nil {
panic(plainError("close of nil channel"))
}
lock(&c.lock)
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("close of closed channel"))
}
处理完了这些异常的情况之后就可以开始执行关闭 Channel 的逻辑了,下面这段代码的主要工作就是将 recvq
和 sendq
两个队列中的数据加入到 Goroutine 列表 gList
中,与此同时该函数会清除所有 sudog
上未被处理的元素:
c.closed = 1
var glist gList
for {
sg := c.recvq.dequeue()
if sg == nil {
break
}
if sg.elem != nil {
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
gp := sg.g
gp.param = nil
glist.push(gp)
}
for {
sg := c.sendq.dequeue()
...
}
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
goready(gp, 3)
}
}
该函数在最后会为所有被阻塞的 Goroutine 调用 runtime.goready
触发调度。
调度器
每个线程会都占用 1M以上的内存空间,在对线程进行切换时不止会消耗较多的内存,恢复寄存器中的内容还需要向操作系统申请或者销毁对应的资源,每一次线程上下文的切换都需要消耗 ~1us 左右的时间,但是 Go 调度器对 Goroutine 的上下文切换约为 ~0.2us,减少了 80% 的额外开销。
Go 语言的调度器通过使用与CPU 数量相等的线程减少线程频繁切换的内存开销,同时在每一个线程上执行额外开销更低的 Goroutine 来降低操作系统和硬件的负载。
设计原理
基于任务窃取的 Go 语言调度器使用了沿用至今的 G-M-P 模型,我们能在 runtime: improved scheduler 提交中找到任务窃取调度器刚被实现时的源代码,调度器的 runtime.schedule
函数在这个版本的调度器中反而更简单了:
static void schedule(void) {
G *gp;
top:
if(runtime·gcwaiting) {
gcstopm(); //如果当前运行时在等待垃圾回收
goto top;
}
//从本地或者全局的运行队列中获取待执行的 Goroutine
gp = runqget(m->p);
if(gp == nil)
gp = findrunnable();
...
//在当前线程 M 上运行 Goroutine
execute(gp);
}
当前处理器本地的运行队列中不包含 Goroutine 时,调用 findrunnable
函数会触发工作窃取,从其它的处理器的队列中随机获取一些 Goroutine。
运行时 G-M-P 模型中引入的处理器 P 是线程和 Goroutine 的中间层,我们从它的结构体中就能看到处理器与 M 和 G 的关系:
struct P {
Lock;
uint32 status;
P* link;
uint32 tick;
M* m;
MCache* mcache;
G** runq;
int32 runqhead;
int32 runqtail;
int32 runqsize;
G* gfree;
int32 gfreecnt;
};
处理器持有一个由可运行的 Goroutine 组成的运行队列 runq
,还反向持有一个线程。调度器在调度时会从处理器的队列中选择队列头的 Goroutine 放到线程 M 上执行。
基于工作窃取的多线程调度器将每一个线程绑定到了独立的 CPU 上,这些线程会被不同处理器管理,不同的处理器通过工作窃取对任务进行再分配实现任务的平衡,也能提升调度器和 Go 语言程序的整体性能,今天所有的 Go 语言服务都受益于这一改动。
数据结构
调度器的三个重要组成部分 — 线程 M、Goroutine G 和处理器 P:
G — 表示 Goroutine,它是一个待执行的任务;
M — 表示操作系统的线程,它由操作系统的调度器调度和管理;
P — 表示处理器,它可以被看做运行在线程上的本地调度器;
下面详细介绍它们的作用、数据结构以及在运行期间可能处于的状态。
G
Gorotuine 就是 Go 语言调度器中待执行的任务,它在运行时调度器中的地位与线程在操作系统中差不多,但是它占用了更小的内存空间,也降低了上下文切换的开销。
Goroutine 只存在于 Go 语言的运行时,它是 Go 语言在用户态提供的线程,作为一种粒度更细的资源调度单元,如果使用得当能够在高并发的场景下更高效地利用机器的 CPU。
Goroutine 在 Go 语言运行时使用私有结构体 runtime.g
表示。这个私有结构体非常复杂,总共包含 40 多个用于表示各种状态的成员变量,我们在这里也不会介绍全部字段,而是会挑选其中的一部分进行介绍:
type g struct {
stack stack //stack 字段描述了当前 Goroutine 的栈内存范围 [stack.lo, stack.hi)
stackguard0 uintptr //用于调度器抢占式调度
preempt bool // 抢占信号
preemptStop bool // 抢占时将状态修改成 `_Gpreempted`
preemptShrink bool // 在同步安全点收缩栈
_panic *_panic // 最内侧的 panic 结构体
_defer *_defer // 最内侧的延迟函数结构体
m *m //当前 Goroutine 占用的线程,可能为空;
sched gobuf //存储 Goroutine 的调度相关的数据
atomicstatus uint32 //Goroutine 的状态
goid int64 //Goroutine 的 ID,该字段对开发者不可见,Go 团队认为引入 ID 会让部分 Goroutine 变得更特殊,从而限制语言的并发能力
}
type gobuf struct {
sp uintptr //栈指针(Stack Pointer)
pc uintptr //程序计数器(Program Counter)
g guintptr //持有 runtime.gobuf
ret sys.Uintreg //系统调用的返回值
...
}
这个在调度器保存或者恢复上下文的时候用到,其中的栈指针和程序计数器会用来存储或者恢复寄存器中的值,改变程序即将执行的代码。
结构体 runtime.g
的 atomicstatus
字段就存储了当前 Goroutine 的状态。除了几个已经不被使用的以及与 GC 相关的状态之外,Goroutine 可能处于以下 9 个状态:
状态 | 描述 |
---|---|
_Gidle |
刚刚被分配并且还没有被初始化 |
_Grunnable |
没有执行代码,没有栈的所有权,存储在运行队列中 |
_Grunning |
可以执行代码,拥有栈的所有权,被赋予了内核线程 M 和处理器 P |
_Gsyscall |
正在执行系统调用,拥有栈的所有权,没有执行用户代码,被赋予了内核线程 M 但是不在运行队列上 |
_Gwaiting |
由于运行时而被阻塞,没有执行用户代码并且不在运行队列上,但是可能存在于 Channel 的等待队列上 |
_Gdead |
没有被使用,没有执行代码,可能有分配的栈 |
_Gcopystack |
栈正在被拷贝,没有执行代码,不在运行队列上 |
_Gpreempted |
由于抢占而被阻塞,没有执行用户代码并且不在运行队列上,等待唤醒 |
_Gscan |
GC 正在扫描栈空间,没有执行代码,可以与其他状态同时存在 |
上述状态中比较常见是 _Grunnable
、_Grunning
、_Gsyscall
、_Gwaiting
和 _Gpreempted
五个状态,我们会重点介绍这几个状态,Goroutine 的状态迁移是一个复杂的过程,触发 Goroutine 状态迁移的方法也很多,在这里我们也没有办法介绍全部的迁移线路,我们会从中选择一些进行介绍。
虽然 Goroutine 在运行时中定义的状态非常多而且复杂,但是我们可以将这些不同的状态聚合成最终的三种:等待中、可运行、运行中,在运行期间我们会在这三种不同的状态来回切换:
- 等待中:Goroutine 正在等待某些条件满足,例如:系统调用结束等,包括
_Gwaiting
、_Gsyscall
和_Gpreempted
几个状态; - 可运行:Goroutine 已经准备就绪,可以在线程运行,如果当前程序中有非常多的 Goroutine,每个 Goroutine 就可能会等待更多的时间,即
_Grunnable
; - 运行中:Goroutine 正在某个线程上运行,即
_Grunning
;
上图展示了 Goroutine 状态迁移的常见路径,其中包括创建 Goroutine 到 Goroutine 被执行、触发系统调用或者抢占式调度器的状态迁移过程。
M
Go 语言并发模型中的 M 是操作系统线程。调度器最多可以创建 10000 个线程,但是其中大多数的线程都不会执行用户代码(可能陷入系统调用),最多只会有 GOMAXPROCS
个活跃线程能够正常运行。
在默认情况下,运行时会将 GOMAXPROCS
设置成当前机器的核数,我们也可以使用 runtime.GOMAXPROCS
来改变程序中最大的线程数。
在默认情况下,一个四核机器上会创建四个活跃的操作系统线程,每一个线程都对应一个运行时中的 runtime.m
结构体。
在大多数情况下,我们都会使用 Go 的默认设置,也就是线程数等于 CPU 个数,在这种情况下不会触发操作系统的线程调度和上下文切换,所有的调度都会发生在用户态,由 Go 语言调度器触发,能够减少非常多的额外开销。
操作系统线程在 Go 语言中会使用私有结构体 runtime.m
来表示,这个结构体中也包含了几十个私有的字段,我们依然对其进行了删减,先来了解几个与 Goroutine 直接相关的字段:
type m struct {
g0 *g
curg *g
...
}
其中 g0 是持有调度栈的 Goroutine,curg
是在当前线程上运行的用户 Goroutine,这也是操作系统线程唯一关心的两个 Goroutine。
g0 是一个运行时中比较特殊的 Goroutine,它会深度参与运行时的调度过程,包括 Goroutine 的创建、大内存分配和 CGO 函数的执行。在后面的小节中,我们会经常看到 g0 的身影。runtime.m
结构体中还存在着三个处理器字段,它们分别表示正在运行代码的处理器 p
、暂存的处理器 nextp
和执行系统调用之前的使用线程的处理器 oldp
:
type m struct {
p puintptr
nextp puintptr
oldp puintptr
}
除了在上面介绍的字段之外,runtime.m
中还包含大量与线程状态、锁、调度、系统调用有关的字段,我们会在分析调度过程时详细介绍。
P
调度器中的处理器 P 是线程和 Goroutine 的中间层,它能提供线程需要的上下文环境,也会负责调度线程上的等待队列,通过处理器 P 的调度,每一个内核线程都能够执行多个 Goroutine,它能在 Goroutine 进行一些 I/O 操作时及时切换,提高线程的利用率。
因为调度器在启动时就会创建 GOMAXPROCS
个处理器,所以 Go 语言程序的处理器数量一定会等于 GOMAXPROCS
,这些处理器会绑定到不同的内核线程上并利用线程的计算资源运行 Goroutine。
runtime.p
是处理器的运行时表示,作为调度器的内部实现,它包含的字段也非常多,其中包括与性能追踪、垃圾回收和计时器相关的字段,这些字段也非常重要,但是在这里就不一一展示了,我们主要关注处理器中的线程和运行队列:
type p struct {
m muintptr
runqhead uint32
runqtail uint32
runq [256]guintptr
runnext guintptr
...
}
反向存储的线程维护着线程与处理器之间的关系,而 runhead
、runqtail
和 runq
三个字段表示处理器持有的运行队列,其中存储着待执行的 Goroutine 列表,runnext
中是线程下一个需要执行的 Goroutine。
runtime.p
结构体中的状态 status
字段会是以下五种中的一种:
状态 | 描述 |
---|---|
_Pidle |
处理器没有运行用户代码或者调度器,被空闲队列或者改变其状态的结构持有,运行队列为空 |
_Prunning |
被线程 M 持有,并且正在执行用户代码或者调度器 |
_Psyscall |
没有执行用户代码,当前线程陷入系统调用 |
_Pgcstop |
被线程 M 持有,当前处理器由于垃圾回收被停止 |
_Pdead |
当前处理器已经不被使用 |
通过分析处理器 P 的状态,我们能够对处理器的工作过程有一些简单理解,例如处理器在执行用户代码时会处于 _Prunning
状态,在当前线程执行 I/O 操作时会陷入 _Psyscall
状态。
小结
我们简单介绍了 Go 语言调度器中常见的数据结构,包括线程 M、处理器 P 和 Goroutine G,它们在 Go 语言运行时中分别使用不同的私有结构体表示,我们在下面会深入分析 Go 语言调度器的实现原理。
调度器启动
调度器的启动过程是我们平时比较难以接触的过程,不过作为程序启动前的准备工作,理解调度器的启动过程对我们理解调度器的实现原理很有帮助,运行时通过 runtime.schedinit
函数初始化调度器:
func schedinit() {
_g_ := getg()
...
sched.maxmcount = 10000
...
sched.lastpoll = uint64(nanotime())
procs := ncpu
if n, ok := atoi32(gogetenv("GOMAXPROCS")); ok && n > 0 {
procs = n
}
if procresize(procs) != nil {
throw("unknown runnable goroutine during bootstrap")
}
}
在调度器初始函数执行的过程中会将 maxmcount
设置成 10000,这也就是一个 Go 语言程序能够创建的最大线程数,虽然最多可以创建 10000 个线程,但是可以同时运行的线程还是由 GOMAXPROCS
变量控制。
我们从环境变量 GOMAXPROCS
获取了程序能够同时运行的最大处理器数之后就会调用 runtime.procresize
更新程序中处理器的数量,在这时整个程序不会执行任何用户 Goroutine,调度器也会进入锁定状态,runtime.procresize
是调度器启动的最后一步,在这一步过后调度器会完成相应数量处理器的启动,等待用户创建运行新的 Goroutine 并为 Goroutine 调度处理器资源。
创建 Goroutine
想要启动一个新的 Goroutine 来执行任务时,我们需要使用 Go 语言中的 go
关键字,这个关键字会在编译期间通过以下方法 cmd/compile/internal/gc.state.stmt
和 cmd/compile/internal/gc.state.call
两个方法将该关键字转换成 runtime.newproc
函数调用,该函数会接收大小和表示函数的指针 funcval
。在这个函数中我们还会获取 Goroutine 以及调用方的程序计数器,然后调用 runtime.newproc1
函数:
func newproc(siz int32, fn *funcval) {
argp := add(unsafe.Pointer(&fn), sys.PtrSize)
gp := getg()
pc := getcallerpc()
newproc1(fn, (*uint8)(argp), siz, gp, pc)
}
runtime.newproc1
会根据传入参数初始化一个 g
结构体,该函数实现可以分成以下几个部分:
- 获取或者创建新的 Goroutine 结构体,先从处理器的
gFree
列表中查找空闲的 Goroutine,如果不存在空闲的 Goroutine,就会通过runtime.malg
函数创建一个栈大小足够的新结构体; - 将传入的参数移到 Goroutine 的栈上,调用
runtime.memmove
函数将fn
函数的全部参数拷贝到栈上; - 更新 Goroutine 调度相关的属性,
runtime.newproc1
会设置新的 Goroutine 结构体的参数,包括栈指针、程序计数器并更新其状态到_Grunnable
; - 将 Goroutine 加入处理器的运行队列,将初始化好的 Goroutine 加入处理器的运行队列并在满足条件时调用
runtime.wakep
函数唤醒新的处理执行 Goroutine;
调度循环
调度器启动之后,Go 语言运行时会调用 runtime.mstart
以及 runtime.mstart1
,前者会初始化 g0 的 stackguard0
和 stackguard1
字段,后者会初始化线程并调用 runtime.schedule
进入调度循环:
func schedule() {
_g_ := getg()
top:
var gp *g
var inheritTime bool
if gp == nil {
if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 {
lock(&sched.lock)
gp = globrunqget(_g_.m.p.ptr(), 1)
unlock(&sched.lock)
}
}
if gp == nil {
gp, inheritTime = runqget(_g_.m.p.ptr())
}
if gp == nil {
gp, inheritTime = findrunnable()
}
execute(gp, inheritTime)
}
runtime.schedule
函数的会从不同地方查找待执行的 Goroutine:
- 为了保证公平,当全局运行队列中有待执行的 Goroutine 时,通过
schedtick
保证有一定几率会从全局的运行队列中查找对应的 Goroutine; - 从处理器本地的运行队列中查找待执行的 Goroutine;
- 如果前两种方法都没有找到 Goroutine,就会通过
runtime.findrunnable
进行阻塞地查找 Goroutine;
runtime.findrunnable
函数的实现非常复杂,这个 300 多行的函数通过以下的过程获取可运行的 Goroutine:
- 从本地运行队列、全局运行队列中查找;
- 从网络轮询器中查找是否有 Goroutine 等待运行;
- 通过
runtime.runqsteal
函数尝试从其他随机的处理器中窃取待运行的 Goroutine,在该过程中还可能窃取处理器中的计时器;
因为函数的实现过于复杂,上述执行过程是经过大量简化的,总而言之,当前函数一定会返回一个可执行的 Goroutine,如果当前不存在就会阻塞等待。
接下来由 runtime.execute
函数执行获取的 Goroutine,做好准备工作后,它会通过 runtime.gogo
将 Goroutine 调度到当前线程上。
func execute(gp *g, inheritTime bool) {
_g_ := getg()
_g_.m.curg = gp
gp.m = _g_.m
casgstatus(gp, _Grunnable, _Grunning)
gp.waitsince = 0
gp.preempt = false
gp.stackguard0 = gp.stack.lo + _StackGuard
if !inheritTime {
_g_.m.p.ptr().schedtick++
}
gogo(&gp.sched)
}
runtime.gogo
在不同处理器架构上的实现都不同,但是不同的实现也都大同小异,下面是该函数在 386 架构上的实现:
TEXT runtime·gogo(SB), NOSPLIT, $8-4
MOVL buf+0(FP), BX // 获取调度信息
MOVL gobuf_g(BX), DX
MOVL 0(DX), CX // 保证 Goroutine 不为空
get_tls(CX)
MOVL DX, g(CX)
MOVL gobuf_sp(BX), SP // 将 runtime.goexit 函数的 PC 恢复到 SP 中
MOVL gobuf_ret(BX), AX
MOVL gobuf_ctxt(BX), DX
MOVL $0, gobuf_sp(BX)
MOVL $0, gobuf_ret(BX)
MOVL $0, gobuf_ctxt(BX)
MOVL gobuf_pc(BX), BX // 获取待执行函数的程序计数器
JMP BX // 开始执行
该函数的实现非常巧妙,它从 runtime.gobuf
中取出了 runtime.goexit
的程序计数器和待执行函数的程序计数器,其中:
-
runtime.goexit
的程序计数器被放到了栈 SP 上; - 待执行函数的程序计数器被放到了寄存器 BX 上;
runtime.gogo
就利用了 Go 语言的调用惯例成功模拟这一调用过程,通过以下几个关键指令模拟 CALL
的过程:
MOVL gobuf_sp(BX), SP // 将 runtime.goexit 函数的 PC 恢复到 SP 中
MOVL gobuf_pc(BX), BX // 获取待执行函数的程序计数器
JMP BX // 开始执行
当 Goroutine 中运行的函数返回时就会跳转到 runtime.goexit
所在位置执行该函数:
TEXT runtime·goexit(SB),NOSPLIT,$0-0
CALL runtime·goexit1(SB)
func goexit1() {
mcall(goexit0)
}
经过套娃似的一系列函数调用,我们最终在当前线程的 g0 的栈上调用 runtime.goexit0
函数,该函数会将 Goroutine 转换会 _Gdead
状态、清理其中的字段、移除 Goroutine 和线程的关联并调用 runtime.gfput
重新加入处理器的 Goroutine 空闲列表 gFree
:
func goexit0(gp *g) {
_g_ := getg()
casgstatus(gp, _Grunning, _Gdead)
gp.m = nil
...
gp.param = nil
gp.labels = nil
gp.timer = nil
dropg()
gfput(_g_.m.p.ptr(), gp)
schedule()
}
在最后 runtime.goexit0
函数会重新调用 runtime.schedule
触发新的 Goroutine 调度,我们可以认为调度循环永远都不会返回。
Go 语言中的运行时调度循环会从 runtime.schedule
函数开始,最终又回到 runtime.schedule
;这里介绍的是 Goroutine 正常执行并退出的逻辑,实际情况会复杂得多,多数情况下 Goroutine 的执行的过程中都会经历协作式或者抢占式调度,这时会让出线程的使用权等待调度器的唤醒。