01

介绍

Mutex 互斥锁严格锁定读和写,这在读多写少的场景,未免显得有些「浪费」,在 Go 语言中,sync 包中的 RWMutex 类型可以解决这类问题,RWMutex 是基于 Mutex 实现的,RWMutex 是读写(reader/writer)互斥锁,RWMutex 在某一特定时间内,只能由若干 reader(读操作) 持有锁或只能由单个 writer(写操作) 持有锁。

例如,如果某个执行读操作的 goroutine 持有锁(共享锁),其他读操作的 goroutine 将不会阻塞,而是可以并发访问共享变量,提升读性能;如果某个执行写操作的 goroutine 持有锁(排它锁),其他的 goroutine,无论是执行读操作,还是执行写操作,都会阻塞,直到这个持有锁的写操作 goroutine 释放锁。

02

使用场景

读者通过阅读 Part 01 的内容,相信已经明白,RWMutex 类型适用于读多写少的场景。

如果我们在开始写程序的时候,就可以预估是读多写少的场景,那就直接使用 RWMutex 类型的读写互斥锁,否则,可以先使用 Mutex 类型的互斥锁,后续代码优化的时候,再根据实际情况来看是否可以改用 RWMutex 类型的读写互斥锁来优化代码的读性能。

RWMutex 类型一共有 6 个方法,

通过阅读 Go 源码 /usr/local/go/src/sync/rwmutex.go,

我们可以发现分别是 RLock、RUnlock、rUnlockSlow、Lock、Unlock 和 RLocker。

下面分别介绍一下这几个方法:

RLock/RUnlock:RLock 锁定写操作,如果锁已被写操作持有,RLock 方法会被阻塞,直到锁释放;如果锁已被读操作持有,RLock 方法会直接返回。RUnlock 是读操作对应的释放锁的方法。一般用于读操作的场景。

Lock/Unlock:Lock 锁定读写操作,不管是读操作持有锁,还是写操作持有锁,Lock 方法都会被阻塞,直到锁释放。Unlock 是对应的释放锁方法。一般用于写操作的场景。

rUnlockSlow:检查读操作是否全部释放锁,如果读锁全部释放,才可以唤醒写操作去请求写锁。

RLocker:RLocker 为读操作返回一个 Locker 接口,它的 Lock 方法会调用 RWMutex 类型的 RLock方法,它的 Unlock 方法会调用 RWMutex 类型的 RUnlock方法。

03

实现原理

在 Go 语言中,标准库 sync 包的 RWMutex 类型是采用「写优先」(Write-preferring)的设计,一个写调用持有锁,新的读调用会被阻塞。

RWMutex 类型的字段:

type RWMutex struct {
w Mutex // held if there are pending writers
writerSem uint32 // semaphore for writers to wait for completing readers
readerSem uint32 // semaphore for readers to wait for completing writers
readerCount int32 // number of pending readers
readerWait int32 // number of departing readers
}
const rwmutexMaxReaders = 1 << 30

阅读源码,可以发现 RWMutex 类型共有 5 个字段,其中一个是 Mutex 类型,剩余 4 个字段是辅助字段。

  • w:帮助解决多个写操作竞争锁的问题。
  • writerSem:writer 信号量。
  • readerSem:reader 信号量。
  • readerCount:记录当前 reader 的数量。
  • readerWait:记录 waiter 请求锁时,需要等待完成的 reader 数量。

rwmutexMaxReaders 常量,定义的是 reader 的最大值。

RLock 方法:

func (rw *RWMutex) RLock() {
if race.Enabled {
_ = rw.w.state
race.Disable()
}
if atomic.AddInt32(&rw.readerCount, 1) < 0 {
// A writer is pending, wait for it.
runtime_SemacquireMutex(&rw.readerSem, false, 0)
}
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(&rw.readerSem))
}
}

阅读源码,第 6 行代码,对 readerCount 进行加 1 操作,如果 readerCount 的值为负数,代表此时有 writer 等待请求锁,因为,RWMutex 是采用写优先的方案设计的,此时,需要优先处理 writer 操作,暂时把新来的 reader 阻塞。

RUnlock 方法和 rUnlockSlow 方法:

func (rw *RWMutex) RUnlock() {
if race.Enabled {
_ = rw.w.state
race.ReleaseMerge(unsafe.Pointer(&rw.writerSem))
race.Disable()
}
if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
// Outlined slow-path to allow the fast-path to be inlined
rw.rUnlockSlow(r)
}
if race.Enabled {
race.Enable()
}
}
func (rw *RWMutex) rUnlockSlow(r int32) {
if r+1 == 0 || r+1 == -rwmutexMaxReaders {
race.Enable()
throw("sync: RUnlock of unlocked RWMutex")
}
// A writer is pending.
if atomic.AddInt32(&rw.readerWait, -1) == 0 {
// The last reader unblocks the writer.
runtime_Semrelease(&rw.writerSem, false, 1)
}
}

阅读源码,第 7 行代码,对 readerCount 进行减 1 操作,如果 readerCount 的值为负数,代表此时有 writer 等待请求锁,第 9 行代码,通过调用 rUnlockSlow 方法,检查 reader 是否全部释放读锁了,如果已全部释放读锁,就可以唤醒请求写锁的 writer 了。

通过 Rlock 和 RUnlock 方法的源码,我们可以得出的结论是,writer 请求锁的优先处理权只限定于新 reader,如果在 writer 请求锁时,已有 reader 持有锁,仍然需要等待持有锁的 reader 释放锁。

Lock 方法:

func (rw *RWMutex) Lock() {
if race.Enabled {
_ = rw.w.state
race.Disable()
}
// First, resolve competition with other writers.
rw.w.Lock()
// Announce to readers there is a pending writer.
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
// Wait for active readers.
if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
runtime_SemacquireMutex(&rw.writerSem, false, 0)
}
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(&rw.readerSem))
race.Acquire(unsafe.Pointer(&rw.writerSem))
}
}

阅读源码,可以发现 RWMutex 的 Lock 方法,使用 Mutex 的 Lock 方法,当有一个 writer 持有互斥锁时,通过将 readerCount 减去 rwmutexMaxReaders 常量,使 readerCount 变为负数,即保存了reader 数量,也代表了当前有 writer 请求锁。

第 9 行代码,还记录了当前持有锁的 reader 的数量,如果持有锁的 reader 的数量不等于 0,第 11 行代码,将 readerCount 赋值给 readerWait,同时当前 writer 进入阻塞状态,等待所有持有锁的 reader 全部释放锁,才会唤醒当前被阻塞的 writer。

Unlock 方法:

func (rw *RWMutex) Unlock() {
if race.Enabled {
_ = rw.w.state
race.Release(unsafe.Pointer(&rw.readerSem))
race.Disable()
}
// Announce to readers there is no active writer.
r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
if r >= rwmutexMaxReaders {
race.Enable()
throw("sync: Unlock of unlocked RWMutex")
}
// Unblock blocked readers, if any.
for i := 0; i < int(r); i++ {
runtime_Semrelease(&rw.readerSem, false, 0)
}
// Allow other writers to proceed.
rw.w.Unlock()
if race.Enabled {
race.Enable()
}
}

阅读源码,可以发现当前 writer 释放锁时,第 9 行代码,会通过将 readerCount 加上 rwmutexMaxReaders 常量,将 readerCount 变为正数,代表当前没有 writer 持有锁了,第 15 行代码,开始唤醒阻塞等待的 reader,第 19 行代码,释放互斥锁,等待其他 writer 请求锁。

细心的读者可能已经发现,Lock 方法是先持有互斥锁,再修改字段,Unlock 方法是最后释放互斥锁,再修改字段,采用这种顺序,是为了保证修改字段也受到互斥锁的保护。

04

踩坑

RWMutex 读写互斥锁的锁操作必须成对出现,Lock 和 RLock 操作,如果在未成对调用 Unlock 和 RUnlock 的情况下,重复调用 Lock 和 RLock,因为锁还没有被释放,可能会导致死锁;

Unlock 和 RUnlock 操作,如果在未对调用 Lock 和 RLock 的情况下,直接给一个未加锁的 RWMutex 释放锁,会导致程序 panic。