读写锁(RWMutex)是在互斥锁(Mutex)的基础上构建的。Mutex笔记见另一篇
那么为什么需要读写锁呢?试想一下,在读多写少的场景下,mutex只允许一个goroutine进行读操作,而读操作往往是不会修改目标数据的。那我们为什么不允许多个goroutine并发执行读操作以提高效率呢,所以就有了读写锁。读锁和写锁互斥,但读锁和读锁不互斥。
下面是读锁和写锁的兼容性,是不是很像innoDB的排他锁和共享锁呢
. | rlock | lock |
---|---|---|
rlock | v | x |
lock | x | x |
数据结构
type RWMutex struct {
w Mutex // held if there are pending writers
writerSem uint32 // 唤醒写锁的信号
readerSem uint32 // 唤醒读锁的信号
readerCount int32 // 当前正在进行读操作的个数, 已经获取读锁 + 还没获取读锁
readerWait int32 // 获取写锁前需要等待多少个读锁解锁
}
RLock
func (rw *RWMutex) RLock() {
.....
// 读锁个数 + 1, 在什么情况下会<0呢?在有写锁被获取的时候
if atomic.AddInt32(&rw.readerCount, 1) < 0 {
// sleep 等待写锁释放时唤醒
runtime_SemacquireMutex(&rw.readerSem, false)
}
.....
}
Lock
func (rw *RWMutex) Lock() {
.....
// 写锁用到了互斥锁,所以同时只会有一把写锁
rw.w.Lock()
// 读锁个数(readerCount)瞬间减去一个非常大的数(const rwmutexMaxReaders = 1 << 30),为什么需要这样做呢?
// 其实在上面RLock()中是通过判断readerCount来决定是否可以获取读锁的,当readerCount变成一个非常小的负数时会导致无法获取到读锁
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
// 这里的r其实就是原来的readerCount。r!=0 说明还有读锁未释放
if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
// sleep 等待读锁释放唤醒
runtime_SemacquireMutex(&rw.writerSem, false)
}
......
}
RUnlock
func (rw *RWMutex) RUnlock() {
....
// readerCount - 1, 什么时候 r < 0呢? 解锁一个未加锁的锁
if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
// r+1 == 0 解锁了一个空闲的锁。r+1 == -rwmutexMaxReaders 解锁了一个写锁
if r+1 == 0 || r+1 == -rwmutexMaxReaders {
race.Enable()
throw("sync: RUnlock of unlocked RWMutex")
}
// 写锁前面等待释放的读锁释放完了,唤醒写锁
if atomic.AddInt32(&rw.readerWait, -1) == 0 {
runtime_Semrelease(&rw.writerSem, false)
}
}
.....
}
从上面可以看锁其实也是有先后顺序的。虽然读锁和读锁兼容,但并不是一直存在读操作写锁就没机会执行了。
Unlock
func (rw *RWMutex) Unlock() {
.....
// lock()的时候减了一个非常大的数,现在加回去
r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
// 正常情况下r应该远小于rwmutexMaxReaders。这里说明解锁了一个未锁的锁
if r >= rwmutexMaxReaders {
race.Enable()
throw("sync: Unlock of unlocked RWMutex")
}
// 循环唤醒读锁
for i := 0; i < int(r); i++ {
runtime_Semrelease(&rw.readerSem, false)
}
// 解锁互斥锁
rw.w.Unlock()
......
}
后记
读写锁整体来说还是比较简单的,没有啥繁琐的逻辑。