01
介绍
Mutex 互斥锁严格锁定读和写,这在读多写少的场景,未免显得有些「浪费」,在 Go 语言中,sync 包中的 RWMutex 类型可以解决这类问题,RWMutex 是基于 Mutex 实现的,RWMutex 是读写(reader/writer)互斥锁,RWMutex 在某一特定时间内,只能由若干 reader(读操作) 持有锁或只能由单个 writer(写操作) 持有锁。
例如,如果某个执行读操作的 goroutine 持有锁(共享锁),其他读操作的 goroutine 将不会阻塞,而是可以并发访问共享变量,提升读性能;如果某个执行写操作的 goroutine 持有锁(排它锁),其他的 goroutine,无论是执行读操作,还是执行写操作,都会阻塞,直到这个持有锁的写操作 goroutine 释放锁。
02
使用场景
读者通过阅读 Part 01 的内容,相信已经明白,RWMutex 类型适用于读多写少的场景。
如果我们在开始写程序的时候,就可以预估是读多写少的场景,那就直接使用 RWMutex 类型的读写互斥锁,否则,可以先使用 Mutex 类型的互斥锁,后续代码优化的时候,再根据实际情况来看是否可以改用 RWMutex 类型的读写互斥锁来优化代码的读性能。
RWMutex 类型一共有 6 个方法,
通过阅读 Go 源码 /usr/local/go/src/sync/rwmutex.go,
我们可以发现分别是 RLock、RUnlock、rUnlockSlow、Lock、Unlock 和 RLocker。
下面分别介绍一下这几个方法:
RLock/RUnlock:RLock 锁定写操作,如果锁已被写操作持有,RLock 方法会被阻塞,直到锁释放;如果锁已被读操作持有,RLock 方法会直接返回。RUnlock 是读操作对应的释放锁的方法。一般用于读操作的场景。
Lock/Unlock:Lock 锁定读写操作,不管是读操作持有锁,还是写操作持有锁,Lock 方法都会被阻塞,直到锁释放。Unlock 是对应的释放锁方法。一般用于写操作的场景。
rUnlockSlow:检查读操作是否全部释放锁,如果读锁全部释放,才可以唤醒写操作去请求写锁。
RLocker:RLocker 为读操作返回一个 Locker 接口,它的 Lock 方法会调用 RWMutex 类型的 RLock方法,它的 Unlock 方法会调用 RWMutex 类型的 RUnlock方法。
03
实现原理
在 Go 语言中,标准库 sync 包的 RWMutex 类型是采用「写优先」(Write-preferring)的设计,一个写调用持有锁,新的读调用会被阻塞。
RWMutex 类型的字段:
type RWMutex struct { w Mutex // held if there are pending writers writerSem uint32 // semaphore for writers to wait for completing readers readerSem uint32 // semaphore for readers to wait for completing writers readerCount int32 // number of pending readers readerWait int32 // number of departing readers } const rwmutexMaxReaders = 1 << 30
阅读源码,可以发现 RWMutex 类型共有 5 个字段,其中一个是 Mutex 类型,剩余 4 个字段是辅助字段。
- w:帮助解决多个写操作竞争锁的问题。
- writerSem:writer 信号量。
- readerSem:reader 信号量。
- readerCount:记录当前 reader 的数量。
- readerWait:记录 waiter 请求锁时,需要等待完成的 reader 数量。
rwmutexMaxReaders 常量,定义的是 reader 的最大值。
RLock 方法:
func (rw *RWMutex) RLock() { if race.Enabled { _ = rw.w.state race.Disable() } if atomic.AddInt32(&rw.readerCount, 1) < 0 { // A writer is pending, wait for it. runtime_SemacquireMutex(&rw.readerSem, false, 0) } if race.Enabled { race.Enable() race.Acquire(unsafe.Pointer(&rw.readerSem)) } }
阅读源码,第 6 行代码,对 readerCount 进行加 1 操作,如果 readerCount 的值为负数,代表此时有 writer 等待请求锁,因为,RWMutex 是采用写优先的方案设计的,此时,需要优先处理 writer 操作,暂时把新来的 reader 阻塞。
RUnlock 方法和 rUnlockSlow 方法:
func (rw *RWMutex) RUnlock() { if race.Enabled { _ = rw.w.state race.ReleaseMerge(unsafe.Pointer(&rw.writerSem)) race.Disable() } if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 { // Outlined slow-path to allow the fast-path to be inlined rw.rUnlockSlow(r) } if race.Enabled { race.Enable() } } func (rw *RWMutex) rUnlockSlow(r int32) { if r+1 == 0 || r+1 == -rwmutexMaxReaders { race.Enable() throw("sync: RUnlock of unlocked RWMutex") } // A writer is pending. if atomic.AddInt32(&rw.readerWait, -1) == 0 { // The last reader unblocks the writer. runtime_Semrelease(&rw.writerSem, false, 1) } }
阅读源码,第 7 行代码,对 readerCount 进行减 1 操作,如果 readerCount 的值为负数,代表此时有 writer 等待请求锁,第 9 行代码,通过调用 rUnlockSlow 方法,检查 reader 是否全部释放读锁了,如果已全部释放读锁,就可以唤醒请求写锁的 writer 了。
通过 Rlock 和 RUnlock 方法的源码,我们可以得出的结论是,writer 请求锁的优先处理权只限定于新 reader,如果在 writer 请求锁时,已有 reader 持有锁,仍然需要等待持有锁的 reader 释放锁。
Lock 方法:
func (rw *RWMutex) Lock() { if race.Enabled { _ = rw.w.state race.Disable() } // First, resolve competition with other writers. rw.w.Lock() // Announce to readers there is a pending writer. r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders // Wait for active readers. if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 { runtime_SemacquireMutex(&rw.writerSem, false, 0) } if race.Enabled { race.Enable() race.Acquire(unsafe.Pointer(&rw.readerSem)) race.Acquire(unsafe.Pointer(&rw.writerSem)) } }
阅读源码,可以发现 RWMutex 的 Lock 方法,使用 Mutex 的 Lock 方法,当有一个 writer 持有互斥锁时,通过将 readerCount 减去 rwmutexMaxReaders 常量,使 readerCount 变为负数,即保存了reader 数量,也代表了当前有 writer 请求锁。
第 9 行代码,还记录了当前持有锁的 reader 的数量,如果持有锁的 reader 的数量不等于 0,第 11 行代码,将 readerCount 赋值给 readerWait,同时当前 writer 进入阻塞状态,等待所有持有锁的 reader 全部释放锁,才会唤醒当前被阻塞的 writer。
Unlock 方法:
func (rw *RWMutex) Unlock() { if race.Enabled { _ = rw.w.state race.Release(unsafe.Pointer(&rw.readerSem)) race.Disable() } // Announce to readers there is no active writer. r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders) if r >= rwmutexMaxReaders { race.Enable() throw("sync: Unlock of unlocked RWMutex") } // Unblock blocked readers, if any. for i := 0; i < int(r); i++ { runtime_Semrelease(&rw.readerSem, false, 0) } // Allow other writers to proceed. rw.w.Unlock() if race.Enabled { race.Enable() } }
阅读源码,可以发现当前 writer 释放锁时,第 9 行代码,会通过将 readerCount 加上 rwmutexMaxReaders 常量,将 readerCount 变为正数,代表当前没有 writer 持有锁了,第 15 行代码,开始唤醒阻塞等待的 reader,第 19 行代码,释放互斥锁,等待其他 writer 请求锁。
细心的读者可能已经发现,Lock 方法是先持有互斥锁,再修改字段,Unlock 方法是最后释放互斥锁,再修改字段,采用这种顺序,是为了保证修改字段也受到互斥锁的保护。
04
踩坑
RWMutex 读写互斥锁的锁操作必须成对出现,Lock 和 RLock 操作,如果在未成对调用 Unlock 和 RUnlock 的情况下,重复调用 Lock 和 RLock,因为锁还没有被释放,可能会导致死锁;
Unlock 和 RUnlock 操作,如果在未对调用 Lock 和 RLock 的情况下,直接给一个未加锁的 RWMutex 释放锁,会导致程序 panic。