并发编程中经常会出现竞争条件和竞争数据的问题,所以需要将代码段设为临界区,通过使用mutex将代码段保护起来。
sync.Mutex
mutex是一种互斥锁,用来控制多线程对共享资源竞争访问的一种同步机制。
Mutex的零值是未锁定的mutex,也就是说mutex不用初始化赋值,另外因为它的state字段存储了锁定,唤醒,饥饿等状态以及等待的waiter数量,所以它是不可复制的。通过channel也可以实现更高级的同步。
Mutex的实现
源码中有一段关于锁的描述如下:
// Mutex can be in 2 modes of operations: normal and starvation.// In normal mode waiters are queued in FIFO order, but a woken up waiter
// does not own the mutex and competes with new arriving goroutines over
// the ownership. New arriving goroutines have an advantage -- they are
// already running on CPU and there can be lots of them, so a woken up
// waiter has good chances of losing. In such case it is queued at front
// of the wait queue. If a waiter fails to acquire the mutex for more than 1ms,
// it switches mutex to the starvation mode.
//
// In starvation mode ownership of the mutex is directly handed off from
// the unlocking goroutine to the waiter at the front of the queue.
// New arriving goroutines don't try to acquire the mutex even if it appears
// to be unlocked, and don't try to spin. Instead they queue themselves at
// the tail of the wait queue.
//
// If a waiter receives ownership of the mutex and sees that either
// (1) it is the last waiter in the queue, or (2) it waited for less than 1 ms,
// it switches mutex back to normal operation mode.
//
// Normal mode has considerably better performance as a goroutine can acquire
// a mutex several times in a row even if there are blocked waiters.
// Starvation mode is important to prevent pathological cases of tail latency.
翻译过来就是:
互斥体可以有两种操作模式:正常模式和饥饿模式。
在正常模式下,等待者按照 FIFO 顺序排队,但是被唤醒的等待者不拥有互斥量,而是与新到达的 goroutine 竞争所有权。新到达的 goroutine 有一个优势——它们已经在 CPU 上运行,并且可能有很多,所以醒来的waiter很有可能失败。在这种情况下,它会排在等待队列的前面。如果等待者在超过 1 毫秒内未能获取互斥体,则会将互斥体切换到饥饿模式。
在饥饿模式下,互斥锁的所有权直接从解锁 goroutine 移交给队列前面的waiter。新到达的 goroutine 不会尝试获取互斥锁,即使它看起来已解锁,也不会尝试旋转。相反,它们将自己排在等待队列的末尾。如果某个等待者收到互斥体的所有权,并发现 (1) 它是队列中的最后一个等待者,或者 (2) 它等待的时间少于 1 毫秒,则它将互斥体切换回正常操作模式。
普通模式具有相当好的性能,因为即使存在阻塞的等待者,goroutine 也可以连续多次获取互斥锁。饥饿模式对于防止尾部延迟的病理情况很重要。
mutex的结构体很简单,只有2个字段,其中sema是信号量,用于唤醒/阻塞goroutine,而state被分为了四个部分,是否加锁,唤醒,饥饿模式,阻塞等待的Waiter数量,其中前3个都占一个标志位,剩下的位则是waiter数量。
type Mutex struct {
state int32
sema uint32
}
const (
mutexLocked = 1 << iota // mutex is locked
mutexWoken
mutexStarving mutexWaiterShift = iota
starvationThresholdNs = 1e6
)
按照上面源码中的注释,分析下代码
func (m *Mutex) Lock() {
// 快速路径:很轻松就获取到了锁
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
return
}
// 慢速路径
m.lockSlow()
}
func (m *Mutex) lockSlow() {
var waitStartTime int64
starving := false // 是否饥饿
awoke := false // 是否唤醒
iter := 0 // 自选次数
old := m.state
for {
// 非饥饿模式才可以自旋,因为饥饿模式下所有权都会直接交给waiter(看上面注释里说的饥饿模式下的处理)
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
// 尝试设置 mutexWoken 标志来通知 Unlock 不要唤醒其他阻塞的 goroutine。
// 这个就是上面说的正常模式下,会唤醒当前请求锁的,但又不直接获取锁,只是设置一个标志参与竞争,然后因为已经唤醒一个,就不会唤醒其他的了,所以也阻止了,因为unlock导致唤醒其他goroutine
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
runtime_doSpin()
iter++
old = m.state
continue
}
new := old
// 只有在非饥饿模式下才会尝试获取锁,饥饿模式下则是直接交给waiter,新的goroutine 必须排队,所以这里判断了下不是饥饿模式,才设置锁的标志。
if old&mutexStarving == 0 {
new |= mutexLocked
}
// 如果锁已经加锁或者处于饥饿模式,就得排队,等待的数量加1
if old&(mutexLocked|mutexStarving) != 0 {
new += 1 << mutexWaiterShift
}
// 当前的 goroutine 将互斥锁要切换到饥饿模式,只在当前锁已经是加锁状态才会切换,
// 如果互斥锁当前不是已加锁的状态(已解锁),则不要进行切换,new不会设置饥饿模式的状态位
if starving && old&mutexLocked != 0 {
new |= mutexStarving
}
// 唤醒标志
if awoke {
// 清除唤醒标志
if new&mutexWoken == 0 {
throw("sync: inconsistent mutex state")
}
new &^= mutexWoken
}
// 设置新的状态
if atomic.CompareAndSwapInt32(&m.state, old, new) {
// 非饥饿模式下或者没加锁的情况下成功获取锁
if old&(mutexLocked|mutexStarving) == 0 {
break
}
// 下面处理饥饿模式
// 如果已经有在等待的wiater,则直接加入队头,否则就加入队尾,
// queueLifo为true则加入对头
queueLifo := waitStartTime != 0
if waitStartTime == 0 {
waitStartTime = runtime_nanotime()
}
// 阻塞等待被唤醒
runtime_SemacquireMutex(&m.sema, queueLifo, 1)
// 唤醒后如果有waiter等待的时间超过1秒,则代表处于饥饿状态,准备切换饥饿模式
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state
// 如果被唤醒后,当前锁已经处于饥饿模式则进入if代码段,否则参与竞争
if old&mutexStarving != 0 {
// 不正长的状态,锁已设置或已唤醒,没有等待唤醒的都属于不正常的状态
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
throw("sync: inconsistent mutex state")
}
// delta的默认是设置锁的位,并且将等待的waiter减去1
delta := int32(mutexLocked - 1<<mutexWaiterShift)
// 如果是最后一位等待的waiter,或者waiter等待的时间少于1秒了,则退出饥饿模式
if !starving || old>>mutexWaiterShift == 1 {
// 退出饥饿模式,将delta 减去mutexStarving
delta -= mutexStarving
}
atomic.AddInt32(&m.state, delta)
break
}
awoke = true
iter = 0
} else {
old = m.state
}
}
}
加锁的流程:
- 如果走快速路径加锁成功(没有其他goroutine获取过锁),则直接返回,否则进入慢路径
- 首先进行自旋,自旋的条件是如果已经上锁,且处于正常模式,然后唤醒正在等待的goroutinue(也就是自己),设置唤醒状态位,参与正常模式的竞争。(自旋的目的是为了短临界区的代码释放锁后,可以快速获得锁,而不用经过休眠,再唤醒排队等待,减少goroutine切换的开销)。
- 自旋结束(饥饿模式/超过最大自旋次数/已经解锁)
- 如果不是饥饿模式,则设置加锁标志位,
new |= mutexLocked
- 如果是饥饿模式或者已经加锁的,就继续排队,
new += 1 << mutexWaiterShift
(饥饿模式下新来的goroutine加入队尾排队,已经加锁的也是需要排队等待) - 如果当前锁已经饥饿(还未切换饥饿模式),且已经被加锁,则切换饥饿模式,设置饥饿的标志为位,
new |= mutexStarving
- 如果是已经被唤醒的,则清除唤醒的标志位。
- 如果不是饥饿模式,则设置加锁标志位,
- 利用cas 将old更新为new,设置了新的状态,不一定是加锁成功。
atomic.CompareAndSwapInt32(&m.state, old, new)
,如果是以前没加锁也不是饥饿模式,那肯定代表了加锁成功,直接退出,否则就开始处理饥饿模式。 - 是否有已经在等待的waiter
- 等待时间不为0(参与竞争的goroutine),则直接加入队头
- 没有已经在等待的,加入队尾,饥饿模式下,所有新来的都加入队尾排队。
- 入队阻塞等待唤醒
- 有被唤醒后,根据等待时间判断是否已处于饥饿状态,更新饥饿的状态,等待下一轮将设置为饥饿模式。
if starving && old&mutexLocked != 0 { new |= mutexStarving }
- 如果当前锁已经是饥饿模式
- 设置锁的标志位,将等待的waiter减去1,也就是减去自己。
- 判断能否退出饥饿模式,如果是最后一位等待者或者唤醒后的不处于饥饿状态(等待时间不超过1秒钟),则清除饥饿标志位
- 更新锁状态state。饥饿模式下直接持有锁,退出。
- 如果当前锁没有处于饥饿模式,则参与竞争,进入下一次循环。
读写锁
go 读写锁一般用在读多写少的场景,相比较mutex,性能会好很多。
它包含6个方法,RLock
,RUnlock
,Lock
,Unlock
,TryRLock
,RUnlock
,根据读/写不同的场景调用不同的方法就可以。
数据结构如下:
type RWMutex struct {
w Mutex // 等待或持有的writer
writerSem uint32 // writer等待reader释放锁的信号量
readerSem uint32 // reader等待writer释放锁的信号量
readerCount int32 // reader的数量(包括持有读锁的reader以及等待读锁的reader)
readerWait int32 // 持有读锁的reader数量
}
const rwmutexMaxReaders = 1 << 30 // 最大reader的数量
RLock
方法只有几行代码,当请求读锁的时候,第一行代码对readerCount进行了原子写+1,readerCount小于0则等待被唤醒,说明此时有一个writer持有写锁或者读锁没有被释放导致新进来的写锁得不到锁,所以接下来的读锁需要等待。
readerCount 的类型是int32,这里判断小于0说明是有负数出现的情况,这是因为在写锁的时候会把readerCount进行反转成负数。
func (rw *RWMutex) RLock() {
if atomic.AddInt32(&rw.readerCount, 1) < 0 {
// A writer is pending, wait for it.
runtime_SemacquireMutex(&rw.readerSem, false, 0)
}
}
RUnlock
也只有2行代码,用于解除单个读锁,不会影响其他读锁。解锁时对readerCount减1.
当readerCount大于0时,解锁成功。
当readerCount是负数时,说明有写锁正在进行,阻塞等待,对当前持有锁的reader,readerWait减1,如果是最后一个,则唤醒处于等待状态的writer
func (rw *RWMutex) RUnlock() {
if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
// Outlined slow-path to allow the fast-path to be inlined
rw.rUnlockSlow(r)
}
}
func (rw *RWMutex) rUnlockSlow(r int32) {
if r+1 == 0 || r+1 == -rwmutexMaxReaders {
throw("sync: RUnlock of unlocked RWMutex")
}
// A writer is pending.
if atomic.AddInt32(&rw.readerWait, -1) == 0 {
// The last reader unblocks the writer.
runtime_Semrelease(&rw.writerSem, false, 1)
}
}
Lock
是写锁,RWMutex的w类型是Mutex,rw.w.Lock()
保证如果有多个访问Lock方法,只有一个能获取到写锁,其余的都会阻塞在这行代码。
接下来对readerCount进行反转变成负数,再加上rwmutexMaxReaders,然后判断r是否等于0,同时对rw.readerWait原子写加上r,这里指的是,如果在写锁期间有调用了Rlock,则readerCount值会发生变化,先减rwmutexMaxReaders再加,r的结果就不一定是0,另外即使readerCount的值先减为了负数,此时有读锁进来,对负数加1,也不会影响这里最终r的值。这也是上面RLock
方法里为什么会判断readerCount+1的值是负数的原因。
这样做的好处是调用RLock
的时候通过判断负数就知道有正在请求或者持有的写锁,需要等待。
那么当r不等于0时,说明有持有读锁的reader,对readerWait加上r,然后将当前请求的写锁加入队列,等待被唤醒。当r等于0时,写锁成功。
readerWait的值在请求读锁的时候并没有做处理,只是对readerCount+1,如果没有写锁,那读锁解锁的时候就直接减1解锁成功了。但如果readerCount为负数了,则肯定有写锁在等待被唤醒或者已经拿到锁了,而在写锁请求的时候就会把readerCount的值拷贝到readerWait上(当然这期间很可能readerCount的值又增加了(不可能减少的,因为释放读锁的请求肯定会在这次写锁之后了),但增加的reader肯定在写锁释放之后了,等待下一次有写入会继续拷贝,没有写锁,那对读锁释放会直接从readerCount上减),表示等待或者持有读锁的数量,这样在解锁读锁的时候就直接对readerWait减1。简单来说readerCount就是用来判断读锁请求的时候是否有写锁存在,否则怎么在请求读锁的时候,知道是否有写锁。
func (rw *RWMutex) Lock() {
// First, resolve competition with other writers.
rw.w.Lock()
// Announce to readers there is a pending writer.
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
// Wait for active readers.
if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
runtime_SemacquireMutex(&rw.writerSem, false, 0)
}
}
Unlock
解锁,解锁和加锁不与特定的goroutine绑带,也就是说可以一个go程加锁,另一个解锁,和 mutex一样。
readerCount加上rwmutexMaxReaders得到还有多少个读锁在等待,如果有,紧接着将所有读锁唤醒,释放写锁。
如果在读锁唤醒,释放锁后还会有等待的写锁会怎样呢?写与写肯定是阻塞的,当已有写锁的时候,此时如果再有写锁会阻塞在RWMutex.Lock方法中的rw.w.Lock()
这一行,但如果此时有读锁进来,则会阻塞等待被唤醒。这样,即使后边进来的写锁发生在读锁之前,但依然会先处理读锁,释放后才会唤醒写锁,而写锁在第一个写锁释放后,根据RWMutex.Lock
方法的情况会阻塞等待唤醒,直到读锁释放后唤醒它。
从这里也可以看出,当有一个写锁的时候,如果此时有另一个写锁加锁请求,也有其他的读锁加锁请求,那么在释放写锁时,首先处理的是读锁,完了才是第二个写锁。
在持有写锁的期间readerCount值肯定是个rwmutexMaxReaders反转后的负数,那么加上rwmutexMaxReaders,如果有等待读锁的reader,得到的肯定是一个小于rwmutexMaxReaders的数。
比如,有2个读锁,在写锁加锁的时候,readerCount = 3-1073741824(rwmutexMaxReaders) =-1073741821,在写锁解锁的时候,根据解锁第一行的代码,r = -1073741821+1073741824 = 3
那么足以说明,如果有等待的reader,那么r的值一定是小于rwmutexMaxReaders的,如果大于等于,说明没有等待的reader,解锁就进入到下面throw抛出来的错误这里。
func (rw *RWMutex) Unlock() {
// Announce to readers there is no active writer.
r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
if r >= rwmutexMaxReaders {
race.Enable()
throw("sync: Unlock of unlocked RWMutex")
}
// Unblock blocked readers, if any.
for i := 0; i < int(r); i++ {
runtime_Semrelease(&rw.readerSem, false, 0)
}
// Allow other writers to proceed.
rw.w.Unlock()
}
总结如下:
- 没有读锁,也没有写锁
- 新来的读锁会立即获得锁
- 新来的写锁会立即获得锁
- 有读锁,没有写锁
- 新来的读锁会立即获得锁
- 新来的写锁会阻塞等待读锁释放
- 有写锁,没有读锁
- 新来的读锁会阻塞等待写锁释放
- 新来的写锁会阻塞等待写锁释放
- 有写锁,读锁在阻塞等待的时候
- 新来的读锁会阻塞等待上面的写锁释放
- 新来的写锁会阻塞等待上面的读锁释放
- 如果同时读锁和写锁都进来了,则上面的写锁释放后,会先处理读锁,等到所有读锁释放后再处理写锁
- 有读锁,写锁在阻塞等待的时候
- 新来的读锁会阻塞等待,写锁拿到锁释放后才会处理读锁
- 新来的写锁会阻塞等待,写锁拿到锁释放后才会处理写锁
- 如果同时有读锁和写锁进来了,则在上面的写锁释放后,会先处理读锁,等到所有读锁释放后再处理写锁