Go阅读-Sync包-同步原语与锁Mutex

环境

go1.20
源码地址mutex.go

目录

汇总

基础概念

  • 并发:单位时间内(一分钟一秒钟)多个任务同时进行,任务可能是交替执行(单核情况),如一个人一分钟吃三个馒头
  • 并行:同时执行多个任务,每个任务在不同处理器上执行,如三个人同时吃三个馒头
  • 临界区:指为了避免并发访问,导致程序中的部分代码被并发访问或修改,将这部分代码保护起来,这部分代码就是临界区,即共享资源,最经典例子的就是num++(并不是一个原子操作,至少包含先读取再+1,最后保存的三个动作)
 // count++操作的汇编代码
    MOVQ    "".count(SB), AX
    LEAQ    1(AX), CX
    MOVQ    CX, "".count(SB)

GO race detector工具

是go提供用于检测并发访问共享资源是否有问题的工具,目前已经集成在工具链中,在编译、测试、运行时加上race参数即可

go test -race my_test.go

1.锁

主要思想:当临界区由一个线程持有,其他线程想进入这个临界区时就返回失败或等待,直到持有的线程退出临界区
image.png

思考

  • 锁与临界区的推荐用法,加锁原则是什么
  • 读写锁是什么锁优先,举例同进程使用不当造成死锁的情况(至少二种)
  • double-check的使用场景举例
  • 锁的零值代表什么
  • mutex的state字段有几个意义,分别表示什么
  • mutex的最大等待协程数是多少

1.1 锁的分类和使用

锁根据粒度分为Mutex和RWMutex,RW又叫读写锁。一般将其和临界区封装在同一个结构体当中一起使用

type safeResource struct {
    resource map[string]string
    lock     sync.Mutex
}

//不推荐下面这种写法
//因为调用方不一定会阅读源码,
//不一定知道锁的存在
var PublicResource map[string]string
var PublicLock sync.RWMutex

go的读写锁RWMutex是写锁优先的,即读锁可以重复加,但是当加写锁时,无法加读锁
读锁互不影响

func TestMutex(t *testing.T) {
    PublicLock.RLock()
    defer PublicLock.RUnlock()
    fmt.Print("第一次加锁")
    PublicLock.RLock()
    defer PublicLock.RUnlock()
    fmt.Print("第二次加锁,不用等待上个读锁释放")
}

读写锁互相冲突

func TestMutex(t *testing.T) {
    PublicLock.RLock()
    defer PublicLock.RUnlock()
    fmt.Print("第一次加锁")
    PublicLock.Lock()
    fmt.Println("有读锁了无法加写锁")
    defer PublicLock.Unlock()
}
//第一次加锁
//panic: test timed out after 30s

写锁优先

func TestMutex(t *testing.T) {
    PublicLock.Lock()
    fmt.Println("添加写锁")
    defer PublicLock.Unlock()
    PublicLock.RLock()
    defer PublicLock.RUnlock()
    fmt.Print("尝试加读锁")
}

//添加写锁
//panic: test timed out after 30s

1.2加锁原则

  • 多个g(协程)只读不加锁
  • Mutex只要加上就会锁住适合写多读少,RWMutex适合写少读多

1.3 双重检查double-check

扩展阅读-java双重检查锁的案例,即保证先行发生关系,防止在一个协程中看到另一个协程执行一半的情况

  • 案例1,给一个线程安全的map增加一个添加键值对的方法,若存在则返回旧值
type safeMap[K comparable, V any] struct {
    values map[K]V
    lock   sync.RWMutex
}
// g1 设置 key1 = 1
// g2 设置 key1 = 2
func (s *safeMap[K, V]) LoadOrStore(key K, newVal V) (val V, loaded bool) {
//读锁不互相冲突,两协程可能都获取到key1未设置的信息
    s.lock.RLock()
    oldVal, ok := s.values[key]
    s.lock.RUnlock()
    if ok {
        return oldVal, true
    }
// 添加写锁有先后顺序假设g1先获取
// key1 = 1 返回 1,false
// g2后进入,按逻辑来说应该是key1已经被设置过了
//返回 1 ,true
// 实际 确是 key2=2 返回2,false
    s.lock.Lock()
    defer s.lock.Unlock()

    s.values[key] = newVal
    return newVal, false
}

为达到我们预期效果,有两种方法:

  • 全程使用mutex
  • 使用双重检查,即在加写锁后再读一次数据
    //第二次加锁,即若不存在则赋值逻辑
    s.lock.Lock()
    defer s.lock.Unlock()
    // g1先进了 key1 =1
    // g2后进来 key1 =2 覆盖了
    // 所以用两次检测 double-check
    oldVal, ok = s.values[key]
    if ok {
        return oldVal, true
    }

    s.values[key] = newVal
    return newVal, false

2.源码阅读

Locker接口定义了锁同步原语的方法集

type Locker interface {
    Lock()
    Unlock()
}

源码文件开头定义了几个常量

    mutexLocked = 1 << iota // mutex is locked
    mutexWoken
    mutexStarving

这几个常量分别代表锁的几种状态:被加锁、正常工作、饥饿
那么值为0代表什么?不就是没加锁嘛,因此使用锁不需要初始化,直接声明变量即可

var mu sync.Mutex

零值就是未上锁,同理嵌入到结构体中/嵌入字段也是零值,不需要初始化该字段

2.1Mutex

Mutex是对Locker的实现

type Mutex struct {
    state int32
    sema  uint32
}
  • state是控制锁状态的核心
    加锁解锁就是把state修改为某个值
  • sema是用来处理沉睡、唤醒的信号量 依赖于两个runtime调用:
    • runtime_SemacquireMutex: sema+1并挂起goroutine
    • runtime_Semrelease : sema-1 并唤醒一个g

2.1.1锁的发展阶段

实际上mutex锁有4个发展阶段,知道如何演进能帮助我们更好理解代码


image.png

对于初版,上面两个字段就足以实现,初版lock方法逻辑也很简单:

  • state==1,表示锁已持有,其余g等待
  • state==0,表示可以上锁,通过CAS(compare-and-swap)设置=1

第二版时state不是一个单纯的01值,而是一个32位复合型字段:

  • 最后1位表示是否被持有
  • 倒数第二位是唤醒标记
  • 前几位表示有多少g等待这把锁
    image.png

    此时最大等待协程数为2^30-1
    lock方法逻辑也变复杂,使用一个for循环不断尝试获取,获取不到就休眠,休眠醒来继续for循环抢锁

第三版为了多给机会,在for循环前加入自旋机制,在尝试一定自选次数后再for循环,即新来的有更多机会拿锁

第四版就是防止等待的g一直拿不到,加入饥饿机制
image.png

饥饿模式的阈值大约是1毫秒

starvationThresholdNs = 1e6

下面就是完整源码阅读

2.1.2加锁和解锁

首先明确锁是由协程g来抢的
Lock方法

// mutexLocked = 1
func (m *Mutex) Lock() {
    if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
        //这部分不重要跳过就行
        if race.Enabled {
            race.Acquire(unsafe.Pointer(m))
        }
        return
    }
    m.lockSlow()
}

compareAndSwapInt32就是交换值等价于以下代码

    if m.state== 0 {
        m.state = 1
        return true
    }
    return false

所以Lock检查锁的状态state:

  • 为0时,即没有人持有锁,将它设置为1,表示该g持有这把锁
  • 不为0,调用lockSlow,g进入自旋状态尝试拿锁

当一个线程尝试获取某一把锁,若此时锁已经被占领(state不为0),那么该线程无法获取到这把锁,该线程会等待,隔一段时间后再次尝试获取,这种采用循环 加锁->等待的机制被称为自旋锁。

自旋是一种多线程同步机制,当前的进程在进入自旋的过程中会一直保持 CPU 的占用,持续检查某个条件是否为真。在多核的 CPU 上,自旋可以避免 Goroutine 的切换,使用恰当会对性能带来很大的增益,但是使用的不恰当就会拖慢整个程序,所以 Goroutine 进入自旋的条件非常苛刻

lockSlow过于复杂,这里简化一下

//mutexLocked = 1 
//mutexStarving = 4
func (m *Mutex) lockSlow() {
    old := m.state
    iter := 0
    for {
//判断当前g能否进入自旋
        if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
            if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
                atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
                awoke = true
            }
//进入自旋并更新值,
//自旋能抢到锁就直接退出了,这是CPU层面控制的
            runtime_doSpin()
            iter++
            old = m.state
            continue
        }
//自旋次数满了之后,失败不能继续自旋,执行下面代码
//更新state字段中存储的不同信息,尝试去获取锁
//state是int32类型,占32位4个字节,下面都是关于字节的运算
// new为锁的新状态
        new := old
        if old&mutexStarving == 0 {
            new |= mutexLocked
        }
        if old&(mutexLocked|mutexStarving) != 0 {
            new += 1 << mutexWaiterShift
        }
        if starving && old&mutexLocked != 0 {
            new |= mutexStarving
        }
        if awoke {
            new &^= mutexWoken
        }
//计算新的状态后使用CAS函数更新状态
        if atomic.CompareAndSwapInt32(&m.state, old, new) {
//第一种情况:自旋完后获取到锁
            if old&(mutexLocked|mutexStarving) == 0 {
                break // locked the mutex with CAS
            }  
//第二种情况:没抢到进入等待队列
//该方法会不断尝试获取锁并陷入休眠等待信号量的释放
//一旦当前可以获取信号量则立即返回,被锁住的代码执行即sync.Mutex.Lock()
//这也是一个CPU层面执行的代码,即使被唤醒也要和新来的g抢锁
//抢锁规则取决于锁的模式
//饥饿模式下该g会直接获得锁
// 正常模式下公平竞争
//抢不到再一次入队等待信号量
            runtime_SemacquireMutex(&m.sema, queueLifo, 1)
//根据等待时间来修改锁的状态,一般超过1ms就进入饥饿模式
            starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
            old = m.state
//饥饿模式下不管新来的g,只消耗队列中的g
//队列中的g等待时间小于1ms也会切回正常模式
            if old&mutexStarving != 0 {
                delta := int32(mutexLocked - 1<<mutexWaiterShift)
//如果等待队列中只存在当前一个g,修改锁状态,退出饥饿模式
                if !starving || old>>mutexWaiterShift == 1 {
                    delta -= mutexStarving
                }
                atomic.AddInt32(&m.state, delta)
                break
            }
            awoke = true
            iter = 0
        } else {
            old = m.state
        }

runtime_canSpin返回true的条件为:
1.运行在多cpu机器上面
2.当前g为了获取锁进入自旋的次数小于4次
3.当前机器至少存在一个正在运行的处理器P且处理的运行队列为空
runtime_doSpin()是执行CPU层面的指令,该指令会占用并消耗CPU时间

func sync_runtime_canSpin(i int) bool {
    if i >= active_spin || ncpu <= 1 || gomaxprocs <= int32(sched.npidle+sched.nmspinning)+1 {
        return false
    }
    if p := getg().m.p.ptr(); !runqempty(p) {
        return false
    }
    return true
}

func sync_runtime_doSpin() {
    procyield(active_spin_cnt)
}

小结一下加锁:

  1. 先是快速路径尝试,直接CAS尝试修改Mutex.state = mutexLocked加锁
  2. 当新来的g发现锁被锁住mutexLocked且在普通模式下,进入自旋,消耗CPU时间等待锁释放(这意味着新来的g有更多机会拿到锁)。自旋是拿锁的一种方式是快路径,通过控制次数来退出自旋,自旋失败后加入等待队列,这是慢路径
    3.被唤醒后没人持有锁就直接给你,否则抢锁,抢锁是不公平的!,根据锁的模式来决定,饥饿模式mutexStarving就直接给他,正常模式就公平的和新来的g(在自旋中的g)抢锁,没抢到就回去重新排队,等待时间决定了是否进入饥饿模式,一般超过1ms,如果当前g是最后一个等待的,或者等待时间小于1ms切回正常模式
    抢锁

解锁unLock

//mutexLocked = 1
func (m *Mutex) Unlock() {
    new := atomic.AddInt32(&m.state, -mutexLocked)
    if new != 0 {
        m.unlockSlow(new)
    }
}
  • 返回0就说明解锁成功了
  • 不为0就调用unlockSlow ,为什么会不为0,因为锁有正常模式和饥饿模式都会修改Mutex.state的值呀
func (m *Mutex) unlockSlow(new int32) {
    if new&mutexStarving == 0 {
        //处理正常模式
    } else {
        //处理饥饿模式
        runtime_Semrelease(&m.sema, true, 1)
    }
}

这里我们同时看到锁上并没有记录被哪个协程持有,这意味着unlock方法可以被任意g调用(造成死锁的原因),同时对一个未上锁的锁调用解锁会引发panic

func (m *Mutex) unlockSlow(new int32) {
    if (new+mutexLocked)&mutexLocked == 0 {
        fatal("sync: unlock of unlocked mutex")
    }

因此使用锁需要遵循“谁申请谁释放原则”

2.1.3扩展mutex

扩展一下让锁知道被哪个协程使用
思路:

  • 通过runtime.stack获取协程唯一id记录到锁中,如果释放时发现不是当前id释放就panic
  • 每个协程生成唯一token标识唯一性

2.2RWMutex

RWMutex 跟cancelCtx类似对Mutex使用装饰模式

type RWMutex struct {
    w           Mutex        // held if there are pending writers
    writerSem   uint32       // semaphore for writers to wait for completing readers
    readerSem   uint32       // semaphore for readers to wait for completing writers
    readerCount atomic.Int32 // number of pending readers
    readerWait  atomic.Int32 // number of departing readers
}
  • sem见过了,用于等待读写操作
  • readerCount 正在执行读的操作数量,readerCount是负值的时候意味着此时有写请求等待(当第一个请求是先写请求过来时,readerCount直接被置为一个很小的负数)
  • readerWait 正在写被阻塞时等待的读操作数量

2.2.1写锁的加锁和解锁

调用w.Lock 用互斥锁的形式阻塞后续操作,因为互斥锁被获取,后续g获取时只能自旋或排队

//readerCount atomic.Int32
func (rw *RWMutex) Lock() {

    rw.w.Lock()
    // 原子操作方式获取持有读锁的数量
    r := rw.readerCount.Add(-rwmutexMaxReaders) + rwmutexMaxReaders
    // Wait for active readers.
//如果其他g持有该互斥锁的读锁,
//进入等待等所有读操作结束后发送释放信号量,唤醒该g
    if r != 0 && rw.readerWait.Add(r) != 0 {
        runtime_SemacquireRWMutex(&rw.writerSem, false, 0)
    }
}

解锁,也是对互斥锁的解锁方法进行封装

func (rw *RWMutex) Unlock() {
// 因为前面加锁时减去了rwmutexMaxReaders
// 这里把它加回来变为正数   
    r := rw.readerCount.Add(rwmutexMaxReaders)
// 循环释放因为读锁陷入等待的g,
//读锁之间不冲突,不竞争,谁先醒都无所谓
    for i := 0; i < int(r); i++ {
        runtime_Semrelease(&rw.readerSem, false, 0)
    }
    // Allow other writers to proceed.
    rw.w.Unlock()

}

2.2.2读锁的解锁和加锁

加锁,通过上面对写锁的学习我们只读,写锁在加锁时会将readerCount设置为负数(将自己加入),这里负数就说明存在写锁,该g进入休眠等待

func (rw *RWMutex) RLock() {
    if rw.readerCount.Add(1) < 0 {
        runtime_SemacquireRWMutexR(&rw.readerSem, false, 0)
    }
}

释放读锁就减1,因为读锁会卡住写锁加锁(这里实际是让第一个拿到写锁的g进入睡眠),因此当readerCount为负数需要发起唤醒信号量(大于等于0说明解锁成功)

func (rw *RWMutex) RUnlock() {
    if r := rw.readerCount.Add(-1); r < 0 {
        rw.rUnlockSlow(r)
    }

}

func (rw *RWMutex) rUnlockSlow(r int32) {
//减少写锁等待的读锁数量,唤醒第一个拿到写锁的协程g
    if rw.readerWait.Add(-1) == 0 {
        runtime_Semrelease(&rw.writerSem, false, 1)
    }
}

小结:

  • 实际上只有写锁才是真正对互斥锁的操作
  • 读锁大多负责休眠和唤醒g,不碰互斥锁,因此读锁之间不互相竞争

2.3 trylock

RW的trylock都是对Mutex的trylock方法进行封装

func (m *Mutex) TryLock() bool {
    old := m.state
    if old&(mutexLocked|mutexStarving) != 0 {
        return false
    }

    if !atomic.CompareAndSwapInt32(&m.state, old, old|mutexLocked) {
        return false
    }

    return true
}

就是进行一次的尝试修改 state,也不执行lockslow进入自旋

3.锁的错误使用场景

3.1死锁deadlock

例子1:锁不只有一把,一个协程在已经对一个资源A加锁的情况下,尝试对另一个资源B加锁,资源B已经被另一个协程上锁,好死不死,掌握资源B锁的协程也要对资源A加锁,就陷入了两个
协程互相无限等待释对方放锁的情况

type Student struct {
    sync.RWMutex
    ID   int
    Name string
}

func TestDeadLock(t *testing.T) {
    s1 := &Student{ID: 1, Name: "s1"}
    s2 := &Student{ID: 2, Name: "s2"}
    var wg sync.WaitGroup
    wg.Add(2)
    go func() {
        s1.Lock()
        defer s1.Unlock()
        time.Sleep(time.Second)
        t.Log("s1名字叫:", s1.Name)
        t.Log("我要去获取s2的名字")
        s2.Lock()
        t.Log(s2.Name)
        s2.Unlock()
        wg.Done()
    }()
    go func() {
        s2.Lock()
        defer s2.Unlock()
        time.Sleep(time.Second)
        t.Log("s2名字叫:", s2.Name)
        t.Log("我要去获取s1的名字")
        s1.Lock()
        t.Log(s1.Name)
        s1.Unlock()
        wg.Done()
    }()
    wg.Wait()

}

触发死锁
解决办法:换成原子操作sync.atomic

例子2 :锁只有一把, 函数A ->对资源A加锁-> 调用函数B,函数B也要对A加锁,那么就要等待A解锁,两个也是互相等待,即对一把锁重复上锁,也叫对锁的重入

func A(s *Student) {
    s.Lock()
    defer s.Unlock()
    B(s)
}

func B(s *Student) {
    s.Lock()
    defer s.Unlock()
    fmt.Println(s.Name)
}
func TestChanDeadlock(t *testing.T) {
    s := &Student{Name: "s"}
    A(s)
}

原因我们阅读过源码知道,锁上面不记录持有锁的协程信息,只修改state

3.2解锁和上锁不在同一个方法体当中

就怕万一忘记释放锁,最好的办法还是在上锁后加defer解锁,即使发生panic也能确保解锁

3.3尝试对锁进行拷贝

也是类似于上面死锁案例2的情况

func foo(s Student) {
    s.Lock()
    defer s.Unlock()
    fmt.Println("in foo")
}

func TestFoo(t *testing.T) {
    var c Student
    c.Lock()
    defer c.Unlock()
    foo(c)
}

foo不知道已经上锁了,尝试用lock来获取锁(但是没有其他协程来释放这个赋值的锁),结果主协程被完全阻塞

3.4 go vet工具

利用 vet检测死锁

go vet demo.go 
# command-line-arguments
./demo.go:20:9: call of foo copies lock value: command-line-arguments.Counter
./demo.go:25:12: foo passes lock by value: command-line-arguments.Counter

参考

1.同步原语与锁
2.双重检查
3.自旋锁
4.极客go并发编程实战

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,142评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,298评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,068评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,081评论 1 291
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,099评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,071评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,990评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,832评论 0 273
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,274评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,488评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,649评论 1 347
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,378评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,979评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,625评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,796评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,643评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,545评论 2 352

推荐阅读更多精彩内容