1、概念
- 读写锁和互斥锁都是为了防止并发而加的锁
- 读写锁相比互斥锁,就是增加了对读写的控制
- 读写锁 写写之间是互斥的,读写也是互斥的,但是读锁 是可以添加多个的
2、RWMutex 代码
type RWMutex struct {
w Mutex // held if there are pending writers
writerSem uint32 // semaphore for writers to wait for completing readers
readerSem uint32 // semaphore for readers to wait for completing writers
readerCount int32 // number of pending readers
readerWait int32 // number of departing readers
}
- w : 互斥锁,写 协程获得该锁后,其他协程处于等待
- writerSem :writer 等待 读完成排队的信号量
- readerSem : read 等待 write 完成排队的信号量
- readerCount : 读锁的计数器
- readerWait : 等待读锁释放的数量
3、常量定义
const rwmutexMaxReaders = 1 << 30
- go支持的最高加读锁的数量 4294967296,go中对读锁的数量采用负值计算,逐渐递减
4、接口定义
- RLock():读锁定
- RUnlock():解除读锁定
- Lock(): 写锁定,与Mutex完全一致
- Unlock():解除写锁定,与Mutex完全一致
5、读锁
5.1 读加锁
RLock 代码
// RLock locks rw for reading.
//
// It should not be used for recursive read locking; a blocked Lock
// call excludes new readers from acquiring the lock. See the
// documentation on the RWMutex type.
func (rw *RWMutex) RLock() {
// 竞争检测
if race.Enabled {
_ = rw.w.state
race.Disable()
}
// readerCount 加一
// 如果小于0,说明目前是写锁定,进入阻塞
if atomic.AddInt32(&rw.readerCount, 1) < 0 {
// A writer is pending, wait for it.
// 等待写锁释放
runtime_SemacquireMutex(&rw.readerSem, false, 0)
}
// 是否开启静态检测
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(&rw.readerSem))
}
}
流程
- 首先进来的时间判断 readerCount 是否小于0.如果小于零,说明目前有写锁进行锁定
- 进入 runtime_SemacquireMutex 等待写锁释放
- 反之若 readerCount 大于零,则加锁成功
readerCount
- 读加锁时用他是否小于零来判断 是否有写锁定
- reader 是整型数
- 在读锁操作的情况下,加一个读锁会为其加一,释放后会减一。
- 在写锁操作时,会将 rederCount 减 2^30,此时就变成负值。在写锁释放的时候会加 2^30
- 所以,在读加锁的时候可以判断这个数是否小于零来确认是否加了 写锁,如果小于零就阻塞等待
- 保证了 写操作时不会有读操作
5.2 读解锁
RUnlock 代码
// RUnlock undoes a single RLock call;
// it does not affect other simultaneous readers.
// It is a run-time error if rw is not locked for reading
// on entry to RUnlock.
func (rw *RWMutex) RUnlock() {
// 是否开启 竞争检测
if race.Enabled {
_ = rw.w.state
race.ReleaseMerge(unsafe.Pointer(&rw.writerSem))
race.Disable()
}
// readerCount 数目减一
// 若结果大于零,则说明还存在读锁锁定,本次解锁就此结束
// 若结果小于等于0 ,则说明已经没有读锁,可以唤醒写锁
if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
// Outlined slow-path to allow the fast-path to be inlined
// 将goroutine排到G队列的后面,挂起goroutine
rw.rUnlockSlow(r)
}
if race.Enabled {
race.Enable()
}
}
rUnlockSlow 代码
func (rw *RWMutex) rUnlockSlow(r int32) {
// 如果r小于零,说明没有读锁锁定,再次解锁会报错
if r+1 == 0 || r+1 == -rwmutexMaxReaders {
race.Enable()
throw("sync: RUnlock of unlocked RWMutex")
}
// A writer is pending.
// readerWait--操作,如果有写锁,推出在写锁之前产生的读锁
if atomic.AddInt32(&rw.readerWait, -1) == 0 {
// The last reader unblocks the writer.
runtime_Semrelease(&rw.writerSem, false, 1)
}
}
流程
- 解锁时先 将 readerCount 减一
- 判断 readerCount 的值
- 如果 rederCount 大于零,说明还有 读锁在锁定,本地解锁完成
- 如果readerCount 小于等于0,说明此时已无读锁锁定,如果存在 等待的写锁时,唤醒写锁
6、 写锁
6.1 写加锁
Lock 代码
// Lock locks rw for writing.
// If the lock is already locked for reading or writing,
// Lock blocks until the lock is available.
func (rw *RWMutex) Lock() {
if race.Enabled {
_ = rw.w.state
race.Disable()
}
// First, resolve competition with other writers.
// 获取互斥锁
rw.w.Lock()
// Announce to readers there is a pending writer.
// 如果 r 不为0 ,说明此时有读锁锁定
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
// Wait for active readers.
// 等待写锁前面的读锁释放, 所以若不为0就阻塞写锁, 等待rUnlockSlow-rUnlockSlow的readerWait-1直至0倍唤醒写锁
if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
// 阻塞写锁
runtime_SemacquireMutex(&rw.writerSem, false, 0)
}
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(&rw.readerSem))
race.Acquire(unsafe.Pointer(&rw.writerSem))
}
}
写加锁 流程
readerWait
- 读操作可以重复加,必然会导致写锁饿死的情况。readerWait可以完美解决写锁饿死的情况。
- 当有写锁申请的时候,若 检测到此时有读锁在锁定状态,就会将 readerCount 的值 拷贝到 readerWait 中
- 每次释放一个读锁时,readerWait 中得值也会减一
- 当 readerWait 中得值变为0 时,就会唤醒写锁
- 其他的读锁 会一直阻塞,等待写锁释放
一句话总结:写锁优先,当有写锁来的时候,你们这些读的赶紧读,读完我要写。其他后面来的读的,你们先等着,等我写完后你们再读
6.2 写解锁
Unlock 代码
// Unlock unlocks rw for writing. It is a run-time error if rw is
// not locked for writing on entry to Unlock.
//
// As with Mutexes, a locked RWMutex is not associated with a particular
// goroutine. One goroutine may RLock (Lock) a RWMutex and then
// arrange for another goroutine to RUnlock (Unlock) it.
func (rw *RWMutex) Unlock() {
if race.Enabled {
_ = rw.w.state
race.Release(unsafe.Pointer(&rw.readerSem))
race.Disable()
}
// Announce to readers there is no active writer.
// 增加readerCount, 若超过读锁的最大限制, 触发panic
r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
if r >= rwmutexMaxReaders {
race.Enable()
throw("sync: Unlock of unlocked RWMutex")
}
// Unblock blocked readers, if any.
//解除阻塞的读锁(若有)
for i := 0; i < int(r); i++ {
runtime_Semrelease(&rw.readerSem, false, 0)
}
// Allow other writers to proceed.
// 释放互斥锁
rw.w.Unlock()
if race.Enabled {
race.Enable()
}
}
写解锁流程
7. 底层 atomic
可以看出读写锁底层用了很多 atomic.AddInt32 中得 增减方法操作 readerCount和readerWait
概念
- atomic 支持 CPU和操作系统级别的原子操作
- 支持操作的数据类型有:int32,int64,uint32,uint64,uintptr,unsafe.Pointer
- 操作的方法有:Add增加和减少,CompareAndSwap比较并交换,Swap交换,Load 读取,Store存储
AddInt32
// AddInt32 atomically adds delta to *addr and returns the new value.
func AddInt32(addr *int32, delta int32) (new int32)
- AddInt32可以实现对元素的原子增加或减少,函数接收两个参数,分别是需要修改的变量的地址和修改的差值,函数会直接在传递的地址上进行修改操作,此外函数会返回修改之后的新值
- 减少操作的话 需要传入负值