现有的讲解锁的文章一般都是分析源码,但是很多并没有解释很多为什么要这么实现的原因,本文尝试从设计一个锁的角度分析 sync.Mutex
实现锁的目的
保护临界资源,防止出现并发问题。具体在go语言中就是解决多个协程并发访问某些公共变量可能出现的问题。
如何设计一把锁
最简单的方式可以设置一个标志位,各个并发单元通过一些原子操作(CAS操作)获取这个标志
获取锁1.0版
for{
if 修改标志成功
执行业务逻辑
解锁
退出
else
继续循环
}
如下图
这样有什么问题呢?
- 大量的这样的协程会占用很高的系统资源,具体在go语言里面,虽然用户代码是停在了for循环里,但是go的runtime并没有认为协程在休眠或者阻塞,调度器会不断地调度这些协程执行。
- 在锁竞争比较激烈的场景很有可能会有协程长时间获取不到锁。
总结来说就是资源占用高并且不够公平,那么如何解决呢?
- 对于占用资源的问题,只要让在等待的协程休眠,在适当的时候唤醒就好。
- 对于公平问题,可以使用队列,让等待协程按照一定的顺序获取锁。
对此,go语言实现了sema包下的semaphore来作为协程使用的信号量。虽然名字角信号量但是使用上更偏向是阻塞和唤醒协程的一种方式,它可以让需要阻塞的协程按照FIFO或者LIFO的方式排队。
锁2.0 加锁
if 修改标志成功
执行业务逻辑
解锁
退出
else
阻塞,等待被唤醒
锁2.0 解锁
唤醒协程
如下图
这样的实现还是会有问题
所有抢锁失败的协程都去排队的话锁的吞吐量会并不会很高。这里我认为的原因是go的runtime调度协程也是有开销的,semaphore唤醒协程的主要过程是将协程的状态改为_Grunning然后等待被调度执行。这里不是立即执行而是要等待一段时间(极端情况下可能要10ms之后才会执行)。当然semaphore可以提供立即执行唤醒的。但是总体上和正在cpu上执行的协程相比,需要调度的操作还是花销比较大。(估计有2个数量级)
所以sync.Mutex结合了以上两种方式实现了随机抢锁和排队抢锁配合的协程管理方式,并且加入了自旋操作来方便快速获取锁。
原理分析
先看锁的结构
// A Mutex is a mutual exclusion lock.
// The zero value for a Mutex is an unlocked mutex.
//
// A Mutex must not be copied after first use.
type Mutex struct {
state int32 // 表示锁的状态 如下图
sema uint32 // 用于传递给semaphore当作信号量 根据sema字段的地址映射到对应的平衡树root节点 再查找到对应的等待队列
}
mutexLocked = 1 << iota // mutex is locked
mutexWoken // 表示目前是否有goroutine被唤醒
mutexStarving //表示锁是否处于饥饿模式
mutexWaiterShift = iota // 增加或者减少等待协程数量的偏移
前面说到sync.Mutex使用了随机抢锁和排队抢锁结合的方式,简单来说就是大部分第一次获取锁失败的协程会先尝试多次自旋,尝试不唤醒队列中的协程,让当前协程更容易抢到锁,如果都失败则会陷入阻塞。而参与随机抢锁的大部分都是目前处于cpu上的协程,并且相对于队列中的协程来说更容易抢到锁,这会导致队列中的协程长时间抢不到锁。所以sync.Mutex设置了锁的“饥饿模式”来应对这种情况。具体如下
- 正常模式下 刚唤醒的协程如果竞争锁失败则放回到等待队列头部(正常是fifo),因为下一个唤醒还得是它。
-
饥饿模式下 刚唤醒的协程直接获得锁,新到来的协程则不自旋也不尝试获取锁 而是直接放到等待队列尾部
那么如何切换这两种模式呢?
这里用源码注释中的说法,把在队列中等待的协程叫做waiter。
- 如果刚被唤醒的waiter未能获取锁超过1ms,它将尝试将锁切换到饥饿模式
- 如果刚被唤醒的waiter看到它是队列中的最后一个waiter或者它等待的时间小于1ms,它将尝试锁切回正常模式。
这样就最大程度保证了公平和较高的吞吐量
到这里的话sync.Mutex的大致原理就结束了,接下来是源码分析。
sync.Mutex源码并不是很长,但是涉及的状态很多,不是很好看懂。这里我提出我认为比较关键的两个点:
- 需要知道sync.Mutex的状态的管理是协作式的,每个在尝试抢锁的协程都有可能可以修改它的状态
- 需要区分协程获取到锁和获取到锁的控制权的不同。获取到锁是函数可以退出执行后面的代码了,获取到锁的控制权是CAS操作成功,可以修改锁的状态
- 以下是源码中的注释:
在正常模式下,waiters按FIFO顺序排队,但被唤醒的waiter不拥有互斥,并与新到达的goroutine竞争控制权。新来的goroutines有一个优势——它们是已经在CPU上运行了,而且可能有很多,所以waiter很有可能会输。在这种情况下,这个被唤醒的waiter会重新加入到队列的最前面。如果waiter未能获取互斥超过1ms,它将锁切换到饥饿模式。在饥饿模式下,锁的控制权直接从刚解锁的goroutine(通过runtime_Semrelease内部的调度,直接继承刚解锁的goroutine的时间片)交给排在队伍最前面的water。新到达的goroutine不会试图获取锁,即使锁现在是解开的状态,也不会试图自旋,而是直接加在等待队列的尾部。如果waiter接收到锁的控制权,并且看到它是队列中的最后一个waiter或者它等待的时间小于1ms,它将锁切回正常模式。正常模式具有相当好的性能,因为goroutine可以获得锁多次,即使有被阻塞的waiters。饥饿模式对于预防无法控制的尾端延迟的很重要。
首先介绍源码中会涉及到一些更基础的操作:
- runtime_canSpin, runtime_doSpin 自旋相关的操作
// Active spinning for sync.Mutex.
//go:linkname sync_runtime_canSpin sync.runtime_canSpin
//go:nosplit
func sync_runtime_canSpin(i int) bool {
// sync.Mutex is cooperative, so we are conservative with spinning.
// Spin only few times and only if running on a multicore machine and
// GOMAXPROCS>1 and there is at least one other running P and local runq is empty.
// As opposed to runtime mutex we don't do passive spinning here,
// because there can be work on global runq or on other Ps.
// 总体意思就是尽量小的浪费计算资源
if i >= active_spin || ncpu <= 1 || gomaxprocs <= int32(sched.npidle+sched.nmspinning)+1 {// 顾名思义 空闲状态的p+处于自选状态的m(可以认为是处于工作状态的p吧 源码里很多地方都是这么用的)
return false
}
if p := getg().m.p.ptr(); !runqempty(p) {
return false
}
return true
}
runtime_doSpin最终会调用下面这个函数 整体意思就是cpu调用30次pause指令
TEXT runtime·procyield(SB),NOSPLIT,$0-0
MOVL cycles+0(FP), AX
again:
PAUSE
SUBL $1, AX
JNZ again
RET
- CAS操作最终会调用下面的函数 重点关注 LOCK和CMPXCHGL指令
CMPXCHGL可以相当于一个原子的Load-Compare-Store操作,具体就是处理器(x86)提供的原子执行 读—比较—存 操作
LOCK指令加上MESI协议可以实现在多核环境下的独占总线操作,同时只有一个核心操作内存,而在同一个核心中Load-Compare-Store指令是不会被打断的
所以在多核多线程的环境下CAS操作是原子的
// bool Cas(int32 *val, int32 old, int32 new)
// Atomically:
// if(*val == old){
// *val = new;
// return 1;
// }else
// return 0;
TEXT ·Cas(SB), NOSPLIT, $0-13
MOVL ptr+0(FP), BX
MOVL old+4(FP), AX
MOVL new+8(FP), CX
LOCK
CMPXCHGL CX, 0(BX)
SETEQ ret+12(FP)
RET
runtime_SemacquireMutex和runtime_Semrelease
基本是操作系统的对信号量的pv操作的逻辑,并且还涉及runtime的一些逻辑,包括管理阻塞协程,如何让出时间片等以后再细说。目前这里需要了解的就是调用runtime_Semrelease会使某个阻塞在runtime_SemacquireMutex的协程可以继续执行下去-
下面是整体的代码
加锁
// Lock locks m.
// If the lock is already in use, the calling goroutine
// blocks until the mutex is available.
func (m *Mutex) Lock() {
// Fast path: grab unlocked mutex. 尝试直接获取锁
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
if race.Enabled { // 竞态检测代码
race.Acquire(unsafe.Pointer(m))
}
return
}
// 没有直接获取到锁
// Slow path (outlined so that the fast path can be inlined) 函数少于80个节点也没有for select 等语句
m.lockSlow()
}
func (m *Mutex) lockSlow() {
var waitStartTime int64
starving := false // 用于判断当前goroutine是否要尝试让锁进入或者退出饥饿模式
awoke := false // 表示当前goroutinue是否是被唤醒的
iter := 0 // 可以进行自旋的轮次
old := m.state // 重新获取一次锁状态
for {
// Don't spin in starvation mode, ownership is handed off to waiters
// so we won't be able to acquire the mutex anyway.
// 如果锁还处于锁定的状态并且锁是正常模式并且还能自旋则进行自旋
// 前面说过 饥饿模式下新到来的goroutine不需要去自旋 锁的控制权直接从解锁的goroutine交给了被唤醒的waiter
// 这里应该可以认为如果当前goroutine一直没获取到锁的控制权则还属于是刚到的goroutine 因为一旦获取到了锁的控制权则只可能是获取到锁后直接返回或者阻塞后被唤醒两种情况
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
// Active spinning makes sense.
// Try to set mutexWoken flag to inform Unlock
// to not wake other blocked goroutines.
// 主动自旋 尝试获取mutexWoken标志位 告诉Unlock不需要唤醒更多的goroutinue了 已经有一个在自旋中的goroutinue在等着获得锁了
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) { // 设置锁状态为有goroutine被唤醒
awoke = true
}
runtime_doSpin() // 进行自旋
iter++
old = m.state
continue
}
new := old // 重新获取一次锁状态
// Don't try to acquire starving mutex, new arriving goroutines must queue.
if old&mutexStarving == 0 { // 饥饿模式下当前goroutine不需要尝试加锁
new |= mutexLocked
}
// 饥饿模式下或者锁已经被其他goroutine获取的情况下 需要加入等待队列
if old&(mutexLocked|mutexStarving) != 0 {
new += 1 << mutexWaiterShift
}
// The current goroutine switches mutex to starvation mode.
// But if the mutex is currently unlocked, don't do the switch.
// Unlock expects that starving mutex has waiters, which will not
// be true in this case.
// 尝试把锁改为饥饿模式还需要锁当前处于被其他goroutinue获取的状态
// 如果当前goroutinue把锁设置为饥饿模式,则Unlock的地方
// 饥饿模式下 所有goroutinue都是从队列中唤醒 并且唤醒的waiter都是直接获取到锁退出 不会走到这。对于刚到的goroutinue starving不会为true
// 正常模式下 starving为true一定是当前goroutinue已经是队列最后一个waiter了(这时候是fifo) 并且已经阻塞超过1ms了。
// 并且在正常模式下才需要设置锁为饥饿模式
// 所以正常情况下 此时等待队列中都没有waiter了。如果在锁没有被获取的情况下设置为饥饿模式会导致Unlock的地方有错误(无法唤醒waiter,压根就没有)
if starving && old&mutexLocked != 0 {
new |= mutexStarving //
}
if awoke { // 当前goroutine是被唤醒的 它负责清空清空唤醒标志位
// The goroutine has been woken from sleep,
// so we need to reset the flag in either case.
if new&mutexWoken == 0 {
throw("sync: inconsistent mutex state")
}
new &^= mutexWoken // 清空唤醒标志位 与加异或操作会把左边数字中和右边数字中为1的位清空
}
if atomic.CompareAndSwapInt32(&m.state, old, new) {
if old&(mutexLocked|mutexStarving) == 0 { // 抢到锁控制权之前锁是 没加锁状态和正常模式 说明很顺利 就直接退出
break // locked the mutex with CAS 获取锁成功
}
// 走到这说明当前goroutinue没有抢到锁或者处于锁饥饿模式(饥饿模式的goroutinue都要进队列)
// If we were already waiting before, queue at the front of the queue.
queueLifo := waitStartTime != 0 // 控制是头插法还是尾插法
if waitStartTime == 0 {
waitStartTime = runtime_nanotime()
}
runtime_SemacquireMutex(&m.sema, queueLifo, 1) // 阻塞
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs // 看等待的时间是否超过是1ms
old = m.state
if old&mutexStarving != 0 {
// If this goroutine was woken and mutex is in starvation mode,
// ownership was handed off to us but mutex is in somewhat
// inconsistent state: mutexLocked is not set and we are still
// accounted as waiter. Fix that.
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
throw("sync: inconsistent mutex state")
}
// 走到这说明当前goroutinue是个被唤醒的waiter 根据Unlock的逻辑 此时mutexWoken标志位只可能是0
delta := int32(mutexLocked - 1<<mutexWaiterShift) // 所以这里要设置锁已经被获取到的标志和队列数量减1
if !starving || old>>mutexWaiterShift == 1 { // 如果不饥饿了或者当前goroutinue是队列中最后一个了(减完就没有了)
// Exit starvation mode.
// Critical to do it here and consider wait time.
// Starvation mode is so inefficient, that two goroutines
// can go lock-step infinitely once they switch mutex
// to starvation mode.
delta -= mutexStarving // 退出饥饿模式
}
atomic.AddInt32(&m.state, delta)
break // 获取锁成功
}
awoke = true
iter = 0
} else {
old = m.state // 没抢到锁的控制权 继续循环
}
}
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
}
解锁
func (m *Mutex) Unlock() {
if race.Enabled {
_ = m.state
race.Release(unsafe.Pointer(m))
}
// Fast path: drop lock bit.
new := atomic.AddInt32(&m.state, -mutexLocked) // 直接改标志位
if new != 0 {
// Outlined slow path to allow inlining the fast path.
// To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock.
m.unlockSlow(new)
}
}
func (m *Mutex) unlockSlow(new int32) {
if (new+mutexLocked)&mutexLocked == 0 {
throw("sync: unlock of unlocked mutex")
}
if new&mutexStarving == 0 {
old := new
for {
// If there are no waiters or a goroutine has already
// been woken or grabbed the lock, no need to wake anyone.
// In starvation mode ownership is directly handed off from unlocking
// goroutine to the next waiter. We are not part of this chain,
// since we did not observe mutexStarving when we unlocked the mutex above.
// So get off the way.
// 正常模式下队列空了就可以直接返回
// 第一次走到这 mutexLocked位=0
// 加锁的自旋操作中如果设置了mutexWoken位=1 则这里也可以直接返回了
// 第一次走到这 mutexStarving位=0
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
// 走到这就要尝试唤醒一个waiter了
// Grab the right to wake someone.
new = (old - 1<<mutexWaiterShift) | mutexWoken // 走到这锁是在正常模式下并且队列不为空 减队列操作在这里执行
if atomic.CompareAndSwapInt32(&m.state, old, new) {
runtime_Semrelease(&m.sema, false, 1) // 唤醒一个waiter 这里不需要让渡时间片 因为可以退出锁执行其他的业务代码了
return
}
old = m.state
}
} else {
// Starving mode: handoff mutex ownership to the next waiter, and yield
// our time slice so that the next waiter can start to run immediately.
// Note: mutexLocked is not set, the waiter will set it after wakeup.
// But mutex is still considered locked if mutexStarving is set,
// so new coming goroutines won't acquire it.
// 饥饿模式下
runtime_Semrelease(&m.sema, true, 1) // 唤醒一个waiter 让渡时间片 直接到这个waiter的runtime_SemacquireMutex后执行
}
}
未完待续