环境
go1.20
源码地址mutex.go
目录
基础概念
- 并发:单位时间内(一分钟一秒钟)多个任务同时进行,任务可能是交替执行(单核情况),如一个人一分钟吃三个馒头
- 并行:同时执行多个任务,每个任务在不同处理器上执行,如三个人同时吃三个馒头
- 临界区:指为了避免并发访问,导致程序中的部分代码被并发访问或修改,将这部分代码保护起来,这部分代码就是临界区,即共享资源,最经典例子的就是num++(并不是一个原子操作,至少包含先读取再+1,最后保存的三个动作)
// count++操作的汇编代码
MOVQ "".count(SB), AX
LEAQ 1(AX), CX
MOVQ CX, "".count(SB)
GO race detector工具
是go提供用于检测并发访问共享资源是否有问题的工具,目前已经集成在工具链中,在编译、测试、运行时加上race参数即可
go test -race my_test.go
1.锁
主要思想:当临界区由一个线程持有,其他线程想进入这个临界区时就返回失败或等待,直到持有的线程退出临界区思考
- 锁与临界区的推荐用法,加锁原则是什么
- 读写锁是什么锁优先,举例同进程使用不当造成死锁的情况(至少二种)
- double-check的使用场景举例
- 锁的零值代表什么
- mutex的state字段有几个意义,分别表示什么
- mutex的最大等待协程数是多少
1.1 锁的分类和使用
锁根据粒度分为Mutex和RWMutex,RW又叫读写锁。一般将其和临界区封装在同一个结构体当中一起使用
type safeResource struct {
resource map[string]string
lock sync.Mutex
}
//不推荐下面这种写法
//因为调用方不一定会阅读源码,
//不一定知道锁的存在
var PublicResource map[string]string
var PublicLock sync.RWMutex
go的读写锁RWMutex是写锁优先的,即读锁可以重复加,但是当加写锁时,无法加读锁
读锁互不影响
func TestMutex(t *testing.T) {
PublicLock.RLock()
defer PublicLock.RUnlock()
fmt.Print("第一次加锁")
PublicLock.RLock()
defer PublicLock.RUnlock()
fmt.Print("第二次加锁,不用等待上个读锁释放")
}
读写锁互相冲突
func TestMutex(t *testing.T) {
PublicLock.RLock()
defer PublicLock.RUnlock()
fmt.Print("第一次加锁")
PublicLock.Lock()
fmt.Println("有读锁了无法加写锁")
defer PublicLock.Unlock()
}
//第一次加锁
//panic: test timed out after 30s
写锁优先
func TestMutex(t *testing.T) {
PublicLock.Lock()
fmt.Println("添加写锁")
defer PublicLock.Unlock()
PublicLock.RLock()
defer PublicLock.RUnlock()
fmt.Print("尝试加读锁")
}
//添加写锁
//panic: test timed out after 30s
1.2加锁原则
- 多个g(协程)只读不加锁
- Mutex只要加上就会锁住适合写多读少,RWMutex适合写少读多
1.3 双重检查double-check
扩展阅读-java双重检查锁的案例,即保证先行发生关系,防止在一个协程中看到另一个协程执行一半的情况
- 案例1,给一个线程安全的map增加一个添加键值对的方法,若存在则返回旧值
type safeMap[K comparable, V any] struct {
values map[K]V
lock sync.RWMutex
}
// g1 设置 key1 = 1
// g2 设置 key1 = 2
func (s *safeMap[K, V]) LoadOrStore(key K, newVal V) (val V, loaded bool) {
//读锁不互相冲突,两协程可能都获取到key1未设置的信息
s.lock.RLock()
oldVal, ok := s.values[key]
s.lock.RUnlock()
if ok {
return oldVal, true
}
// 添加写锁有先后顺序假设g1先获取
// key1 = 1 返回 1,false
// g2后进入,按逻辑来说应该是key1已经被设置过了
//返回 1 ,true
// 实际 确是 key2=2 返回2,false
s.lock.Lock()
defer s.lock.Unlock()
s.values[key] = newVal
return newVal, false
}
为达到我们预期效果,有两种方法:
- 全程使用mutex
- 使用双重检查,即在加写锁后再读一次数据
//第二次加锁,即若不存在则赋值逻辑
s.lock.Lock()
defer s.lock.Unlock()
// g1先进了 key1 =1
// g2后进来 key1 =2 覆盖了
// 所以用两次检测 double-check
oldVal, ok = s.values[key]
if ok {
return oldVal, true
}
s.values[key] = newVal
return newVal, false
2.源码阅读
Locker接口定义了锁同步原语的方法集
type Locker interface {
Lock()
Unlock()
}
源码文件开头定义了几个常量
mutexLocked = 1 << iota // mutex is locked
mutexWoken
mutexStarving
这几个常量分别代表锁的几种状态:被加锁、正常工作、饥饿
那么值为0代表什么?不就是没加锁嘛,因此使用锁不需要初始化,直接声明变量即可
var mu sync.Mutex
即零值就是未上锁,同理嵌入到结构体中/嵌入字段也是零值,不需要初始化该字段
2.1Mutex
Mutex是对Locker的实现
type Mutex struct {
state int32
sema uint32
}
- state是控制锁状态的核心
加锁解锁就是把state修改为某个值 - sema是用来处理沉睡、唤醒的信号量 依赖于两个runtime调用:
- runtime_SemacquireMutex: sema+1并挂起goroutine
- runtime_Semrelease : sema-1 并唤醒一个g
2.1.1锁的发展阶段
实际上mutex锁有4个发展阶段,知道如何演进能帮助我们更好理解代码
对于初版,上面两个字段就足以实现,初版lock方法逻辑也很简单:
- state==1,表示锁已持有,其余g等待
- state==0,表示可以上锁,通过CAS(compare-and-swap)设置=1
第二版时state不是一个单纯的01值,而是一个32位复合型字段:
- 最后1位表示是否被持有
- 倒数第二位是唤醒标记
-
前几位表示有多少g等待这把锁
此时最大等待协程数为2^30-1
lock方法逻辑也变复杂,使用一个for循环不断尝试获取,获取不到就休眠,休眠醒来继续for循环抢锁
第三版为了多给机会,在for循环前加入自旋机制,在尝试一定自选次数后再for循环,即新来的有更多机会拿锁
第四版就是防止等待的g一直拿不到,加入饥饿机制饥饿模式的阈值大约是1毫秒
starvationThresholdNs = 1e6
下面就是完整源码阅读
2.1.2加锁和解锁
首先明确锁是由协程g来抢的
Lock方法
// mutexLocked = 1
func (m *Mutex) Lock() {
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
//这部分不重要跳过就行
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return
}
m.lockSlow()
}
compareAndSwapInt32就是交换值等价于以下代码
if m.state== 0 {
m.state = 1
return true
}
return false
所以Lock检查锁的状态state:
- 为0时,即没有人持有锁,将它设置为1,表示该g持有这把锁
- 不为0,调用lockSlow,g进入自旋状态尝试拿锁
当一个线程尝试获取某一把锁,若此时锁已经被占领(state不为0),那么该线程无法获取到这把锁,该线程会等待,隔一段时间后再次尝试获取,这种采用循环 加锁->等待的机制被称为自旋锁。
自旋是一种多线程同步机制,当前的进程在进入自旋的过程中会一直保持 CPU 的占用,持续检查某个条件是否为真。在多核的 CPU 上,自旋可以避免 Goroutine 的切换,使用恰当会对性能带来很大的增益,但是使用的不恰当就会拖慢整个程序,所以 Goroutine 进入自旋的条件非常苛刻
lockSlow过于复杂,这里简化一下
//mutexLocked = 1
//mutexStarving = 4
func (m *Mutex) lockSlow() {
old := m.state
iter := 0
for {
//判断当前g能否进入自旋
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
//进入自旋并更新值,
//自旋能抢到锁就直接退出了,这是CPU层面控制的
runtime_doSpin()
iter++
old = m.state
continue
}
//自旋次数满了之后,失败不能继续自旋,执行下面代码
//更新state字段中存储的不同信息,尝试去获取锁
//state是int32类型,占32位4个字节,下面都是关于字节的运算
// new为锁的新状态
new := old
if old&mutexStarving == 0 {
new |= mutexLocked
}
if old&(mutexLocked|mutexStarving) != 0 {
new += 1 << mutexWaiterShift
}
if starving && old&mutexLocked != 0 {
new |= mutexStarving
}
if awoke {
new &^= mutexWoken
}
//计算新的状态后使用CAS函数更新状态
if atomic.CompareAndSwapInt32(&m.state, old, new) {
//第一种情况:自旋完后获取到锁
if old&(mutexLocked|mutexStarving) == 0 {
break // locked the mutex with CAS
}
//第二种情况:没抢到进入等待队列
//该方法会不断尝试获取锁并陷入休眠等待信号量的释放
//一旦当前可以获取信号量则立即返回,被锁住的代码执行即sync.Mutex.Lock()
//这也是一个CPU层面执行的代码,即使被唤醒也要和新来的g抢锁
//抢锁规则取决于锁的模式
//饥饿模式下该g会直接获得锁
// 正常模式下公平竞争
//抢不到再一次入队等待信号量
runtime_SemacquireMutex(&m.sema, queueLifo, 1)
//根据等待时间来修改锁的状态,一般超过1ms就进入饥饿模式
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state
//饥饿模式下不管新来的g,只消耗队列中的g
//队列中的g等待时间小于1ms也会切回正常模式
if old&mutexStarving != 0 {
delta := int32(mutexLocked - 1<<mutexWaiterShift)
//如果等待队列中只存在当前一个g,修改锁状态,退出饥饿模式
if !starving || old>>mutexWaiterShift == 1 {
delta -= mutexStarving
}
atomic.AddInt32(&m.state, delta)
break
}
awoke = true
iter = 0
} else {
old = m.state
}
runtime_canSpin返回true的条件为:
1.运行在多cpu机器上面
2.当前g为了获取锁进入自旋的次数小于4次
3.当前机器至少存在一个正在运行的处理器P且处理的运行队列为空
runtime_doSpin()是执行CPU层面的指令,该指令会占用并消耗CPU时间
func sync_runtime_canSpin(i int) bool {
if i >= active_spin || ncpu <= 1 || gomaxprocs <= int32(sched.npidle+sched.nmspinning)+1 {
return false
}
if p := getg().m.p.ptr(); !runqempty(p) {
return false
}
return true
}
func sync_runtime_doSpin() {
procyield(active_spin_cnt)
}
小结一下加锁:
- 先是快速路径尝试,直接CAS尝试修改Mutex.state = mutexLocked加锁
- 当新来的g发现锁被锁住mutexLocked且在普通模式下,进入自旋,消耗CPU时间等待锁释放(这意味着新来的g有更多机会拿到锁)。自旋是拿锁的一种方式是快路径,通过控制次数来退出自旋,自旋失败后加入等待队列,这是慢路径
3.被唤醒后没人持有锁就直接给你,否则抢锁,抢锁是不公平的!,根据锁的模式来决定,饥饿模式mutexStarving就直接给他,正常模式就公平的和新来的g(在自旋中的g)抢锁,没抢到就回去重新排队,等待时间决定了是否进入饥饿模式,一般超过1ms,如果当前g是最后一个等待的,或者等待时间小于1ms切回正常模式
//mutexLocked = 1
func (m *Mutex) Unlock() {
new := atomic.AddInt32(&m.state, -mutexLocked)
if new != 0 {
m.unlockSlow(new)
}
}
- 返回0就说明解锁成功了
- 不为0就调用unlockSlow ,为什么会不为0,因为锁有正常模式和饥饿模式都会修改Mutex.state的值呀
func (m *Mutex) unlockSlow(new int32) {
if new&mutexStarving == 0 {
//处理正常模式
} else {
//处理饥饿模式
runtime_Semrelease(&m.sema, true, 1)
}
}
这里我们同时看到锁上并没有记录被哪个协程持有,这意味着unlock方法可以被任意g调用(造成死锁的原因),同时对一个未上锁的锁调用解锁会引发panic
func (m *Mutex) unlockSlow(new int32) {
if (new+mutexLocked)&mutexLocked == 0 {
fatal("sync: unlock of unlocked mutex")
}
因此使用锁需要遵循“谁申请谁释放原则”
2.1.3扩展mutex
扩展一下让锁知道被哪个协程使用
思路:
- 通过runtime.stack获取协程唯一id记录到锁中,如果释放时发现不是当前id释放就panic
- 每个协程生成唯一token标识唯一性
2.2RWMutex
RWMutex 跟cancelCtx类似对Mutex使用装饰模式
type RWMutex struct {
w Mutex // held if there are pending writers
writerSem uint32 // semaphore for writers to wait for completing readers
readerSem uint32 // semaphore for readers to wait for completing writers
readerCount atomic.Int32 // number of pending readers
readerWait atomic.Int32 // number of departing readers
}
- sem见过了,用于等待读写操作
- readerCount 正在执行读的操作数量,readerCount是负值的时候意味着此时有写请求等待(当第一个请求是先写请求过来时,readerCount直接被置为一个很小的负数)
- readerWait 正在写被阻塞时等待的读操作数量
2.2.1写锁的加锁和解锁
调用w.Lock 用互斥锁的形式阻塞后续操作,因为互斥锁被获取,后续g获取时只能自旋或排队
//readerCount atomic.Int32
func (rw *RWMutex) Lock() {
rw.w.Lock()
// 原子操作方式获取持有读锁的数量
r := rw.readerCount.Add(-rwmutexMaxReaders) + rwmutexMaxReaders
// Wait for active readers.
//如果其他g持有该互斥锁的读锁,
//进入等待等所有读操作结束后发送释放信号量,唤醒该g
if r != 0 && rw.readerWait.Add(r) != 0 {
runtime_SemacquireRWMutex(&rw.writerSem, false, 0)
}
}
解锁,也是对互斥锁的解锁方法进行封装
func (rw *RWMutex) Unlock() {
// 因为前面加锁时减去了rwmutexMaxReaders
// 这里把它加回来变为正数
r := rw.readerCount.Add(rwmutexMaxReaders)
// 循环释放因为读锁陷入等待的g,
//读锁之间不冲突,不竞争,谁先醒都无所谓
for i := 0; i < int(r); i++ {
runtime_Semrelease(&rw.readerSem, false, 0)
}
// Allow other writers to proceed.
rw.w.Unlock()
}
2.2.2读锁的解锁和加锁
加锁,通过上面对写锁的学习我们只读,写锁在加锁时会将readerCount设置为负数(将自己加入),这里负数就说明存在写锁,该g进入休眠等待
func (rw *RWMutex) RLock() {
if rw.readerCount.Add(1) < 0 {
runtime_SemacquireRWMutexR(&rw.readerSem, false, 0)
}
}
释放读锁就减1,因为读锁会卡住写锁加锁(这里实际是让第一个拿到写锁的g进入睡眠),因此当readerCount为负数需要发起唤醒信号量(大于等于0说明解锁成功)
func (rw *RWMutex) RUnlock() {
if r := rw.readerCount.Add(-1); r < 0 {
rw.rUnlockSlow(r)
}
}
func (rw *RWMutex) rUnlockSlow(r int32) {
//减少写锁等待的读锁数量,唤醒第一个拿到写锁的协程g
if rw.readerWait.Add(-1) == 0 {
runtime_Semrelease(&rw.writerSem, false, 1)
}
}
小结:
- 实际上只有写锁才是真正对互斥锁的操作
- 读锁大多负责休眠和唤醒g,不碰互斥锁,因此读锁之间不互相竞争
2.3 trylock
RW的trylock都是对Mutex的trylock方法进行封装
func (m *Mutex) TryLock() bool {
old := m.state
if old&(mutexLocked|mutexStarving) != 0 {
return false
}
if !atomic.CompareAndSwapInt32(&m.state, old, old|mutexLocked) {
return false
}
return true
}
就是进行一次的尝试修改 state,也不执行lockslow进入自旋
3.锁的错误使用场景
3.1死锁deadlock
例子1:锁不只有一把,一个协程在已经对一个资源A加锁的情况下,尝试对另一个资源B加锁,资源B已经被另一个协程上锁,好死不死,掌握资源B锁的协程也要对资源A加锁,就陷入了两个
协程互相无限等待释对方放锁的情况
type Student struct {
sync.RWMutex
ID int
Name string
}
func TestDeadLock(t *testing.T) {
s1 := &Student{ID: 1, Name: "s1"}
s2 := &Student{ID: 2, Name: "s2"}
var wg sync.WaitGroup
wg.Add(2)
go func() {
s1.Lock()
defer s1.Unlock()
time.Sleep(time.Second)
t.Log("s1名字叫:", s1.Name)
t.Log("我要去获取s2的名字")
s2.Lock()
t.Log(s2.Name)
s2.Unlock()
wg.Done()
}()
go func() {
s2.Lock()
defer s2.Unlock()
time.Sleep(time.Second)
t.Log("s2名字叫:", s2.Name)
t.Log("我要去获取s1的名字")
s1.Lock()
t.Log(s1.Name)
s1.Unlock()
wg.Done()
}()
wg.Wait()
}
触发死锁
解决办法:换成原子操作sync.atomic
例子2 :锁只有一把, 函数A ->对资源A加锁-> 调用函数B,函数B也要对A加锁,那么就要等待A解锁,两个也是互相等待,即对一把锁重复上锁,也叫对锁的重入
func A(s *Student) {
s.Lock()
defer s.Unlock()
B(s)
}
func B(s *Student) {
s.Lock()
defer s.Unlock()
fmt.Println(s.Name)
}
func TestChanDeadlock(t *testing.T) {
s := &Student{Name: "s"}
A(s)
}
原因我们阅读过源码知道,锁上面不记录持有锁的协程信息,只修改state
3.2解锁和上锁不在同一个方法体当中
就怕万一忘记释放锁,最好的办法还是在上锁后加defer解锁,即使发生panic也能确保解锁
3.3尝试对锁进行拷贝
也是类似于上面死锁案例2的情况
func foo(s Student) {
s.Lock()
defer s.Unlock()
fmt.Println("in foo")
}
func TestFoo(t *testing.T) {
var c Student
c.Lock()
defer c.Unlock()
foo(c)
}
foo不知道已经上锁了,尝试用lock来获取锁(但是没有其他协程来释放这个赋值的锁),结果主协程被完全阻塞
3.4 go vet工具
利用 vet检测死锁
go vet demo.go
# command-line-arguments
./demo.go:20:9: call of foo copies lock value: command-line-arguments.Counter
./demo.go:25:12: foo passes lock by value: command-line-arguments.Counter