读写锁 RWMutex

1、概念

  • 读写锁和互斥锁都是为了防止并发而加的锁
  • 读写锁相比互斥锁,就是增加了对读写的控制
  • 读写锁 写写之间是互斥的,读写也是互斥的,但是读锁 是可以添加多个的

2、RWMutex 代码

type RWMutex struct {
	w           Mutex  // held if there are pending writers
	writerSem   uint32 // semaphore for writers to wait for completing readers
	readerSem   uint32 // semaphore for readers to wait for completing writers
	readerCount int32  // number of pending readers
	readerWait  int32  // number of departing readers
}
  • w : 互斥锁,写 协程获得该锁后,其他协程处于等待
  • writerSem :writer 等待 读完成排队的信号量
  • readerSem : read 等待 write 完成排队的信号量
  • readerCount : 读锁的计数器
  • readerWait : 等待读锁释放的数量

3、常量定义

const rwmutexMaxReaders = 1 << 30
  • go支持的最高加读锁的数量 4294967296,go中对读锁的数量采用负值计算,逐渐递减

4、接口定义

  • RLock():读锁定
  • RUnlock():解除读锁定
  • Lock(): 写锁定,与Mutex完全一致
  • Unlock():解除写锁定,与Mutex完全一致

5、读锁

5.1 读加锁

RLock 代码

// RLock locks rw for reading.
//
// It should not be used for recursive read locking; a blocked Lock
// call excludes new readers from acquiring the lock. See the
// documentation on the RWMutex type.
func (rw *RWMutex) RLock() {
// 竞争检测
	if race.Enabled {
		_ = rw.w.state
		race.Disable()
	}
	// readerCount 加一
	// 如果小于0,说明目前是写锁定,进入阻塞
	if atomic.AddInt32(&rw.readerCount, 1) < 0 {
		// A writer is pending, wait for it.
		// 等待写锁释放
		runtime_SemacquireMutex(&rw.readerSem, false, 0)
	}
	// 是否开启静态检测
	if race.Enabled {
		race.Enable()
		race.Acquire(unsafe.Pointer(&rw.readerSem))
	}
}

流程

在这里插入图片描述

  • 首先进来的时间判断 readerCount 是否小于0.如果小于零,说明目前有写锁进行锁定
  • 进入 runtime_SemacquireMutex 等待写锁释放
  • 反之若 readerCount 大于零,则加锁成功

readerCount

  • 读加锁时用他是否小于零来判断 是否有写锁定
  • reader 是整型数
  • 读锁操作的情况下,加一个读锁会为其加一,释放后会减一。
  • 在写锁操作时,会将 rederCount 2^30,此时就变成负值。在写锁释放的时候会加 2^30
  • 所以,在读加锁的时候可以判断这个数是否小于零来确认是否加了 写锁,如果小于零就阻塞等待
  • 保证了 写操作时不会有读操作

5.2 读解锁

RUnlock 代码

// RUnlock undoes a single RLock call;
// it does not affect other simultaneous readers.
// It is a run-time error if rw is not locked for reading
// on entry to RUnlock.
func (rw *RWMutex) RUnlock() {
// 是否开启 竞争检测
	if race.Enabled {
		_ = rw.w.state
		race.ReleaseMerge(unsafe.Pointer(&rw.writerSem))
		race.Disable()
	}
	// readerCount 数目减一
	// 若结果大于零,则说明还存在读锁锁定,本次解锁就此结束
	// 若结果小于等于0 ,则说明已经没有读锁,可以唤醒写锁
	if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
		// Outlined slow-path to allow the fast-path to be inlined
		// 将goroutine排到G队列的后面,挂起goroutine
		rw.rUnlockSlow(r)
	}
	if race.Enabled {
		race.Enable()
	}
}

rUnlockSlow 代码

func (rw *RWMutex) rUnlockSlow(r int32) {
// 如果r小于零,说明没有读锁锁定,再次解锁会报错
	if r+1 == 0 || r+1 == -rwmutexMaxReaders {
		race.Enable()
		throw("sync: RUnlock of unlocked RWMutex")
	}
	// A writer is pending.
	// readerWait--操作,如果有写锁,推出在写锁之前产生的读锁
	if atomic.AddInt32(&rw.readerWait, -1) == 0 {
		// The last reader unblocks the writer.
		runtime_Semrelease(&rw.writerSem, false, 1)
	}
}

流程

在这里插入图片描述

  • 解锁时先 将 readerCount 减一
  • 判断 readerCount 的值
  • 如果 rederCount 大于零,说明还有 读锁在锁定,本地解锁完成
  • 如果readerCount 小于等于0,说明此时已无读锁锁定,如果存在 等待的写锁时,唤醒写锁

6、 写锁

6.1 写加锁

Lock 代码

// Lock locks rw for writing.
// If the lock is already locked for reading or writing,
// Lock blocks until the lock is available.
func (rw *RWMutex) Lock() {
	if race.Enabled {
		_ = rw.w.state
		race.Disable()
	}
	// First, resolve competition with other writers.
	// 获取互斥锁
	rw.w.Lock()
	// Announce to readers there is a pending writer.
	// 如果 r 不为0 ,说明此时有读锁锁定
	r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
	// Wait for active readers.
	// 等待写锁前面的读锁释放, 所以若不为0就阻塞写锁, 等待rUnlockSlow-rUnlockSlow的readerWait-1直至0倍唤醒写锁
	if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
	// 阻塞写锁
		runtime_SemacquireMutex(&rw.writerSem, false, 0)
	}
	if race.Enabled {
		race.Enable()
		race.Acquire(unsafe.Pointer(&rw.readerSem))
		race.Acquire(unsafe.Pointer(&rw.writerSem))
	}
}

写加锁 流程

在这里插入图片描述

readerWait

  • 读操作可以重复加,必然会导致写锁饿死的情况。readerWait可以完美解决写锁饿死的情况。
  • 当有写锁申请的时候,若 检测到此时有读锁在锁定状态,就会将 readerCount 的值 拷贝readerWait
  • 每次释放一个读锁时,readerWait 中得值也会减一
  • 当 readerWait 中得值变为0 时,就会唤醒写锁
  • 其他的读锁 会一直阻塞,等待写锁释放

一句话总结:写锁优先,当有写锁来的时候,你们这些读的赶紧读,读完我要写。其他后面来的读的,你们先等着,等我写完后你们再读

6.2 写解锁

Unlock 代码

// Unlock unlocks rw for writing. It is a run-time error if rw is
// not locked for writing on entry to Unlock.
//
// As with Mutexes, a locked RWMutex is not associated with a particular
// goroutine. One goroutine may RLock (Lock) a RWMutex and then
// arrange for another goroutine to RUnlock (Unlock) it.
func (rw *RWMutex) Unlock() {
	if race.Enabled {
		_ = rw.w.state
		race.Release(unsafe.Pointer(&rw.readerSem))
		race.Disable()
	}

	// Announce to readers there is no active writer.
	// 增加readerCount, 若超过读锁的最大限制, 触发panic
	r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
	if r >= rwmutexMaxReaders {
		race.Enable()
		throw("sync: Unlock of unlocked RWMutex")
	}
	// Unblock blocked readers, if any.
	//解除阻塞的读锁(若有)
	for i := 0; i < int(r); i++ {
		runtime_Semrelease(&rw.readerSem, false, 0)
	}
	// Allow other writers to proceed.
	// 释放互斥锁
	rw.w.Unlock()
	if race.Enabled {
		race.Enable()
	}
}

写解锁流程

在这里插入图片描述

7. 底层 atomic

可以看出读写锁底层用了很多 atomic.AddInt32 中得 增减方法操作 readerCount和readerWait

概念

  • atomic 支持 CPU操作系统级别的原子操作
  • 支持操作的数据类型有:int32,int64,uint32,uint64,uintptr,unsafe.Pointer
  • 操作的方法有:Add增加和减少,CompareAndSwap比较并交换,Swap交换,Load 读取,Store存储

AddInt32

// AddInt32 atomically adds delta to *addr and returns the new value.
func AddInt32(addr *int32, delta int32) (new int32)
  • AddInt32可以实现对元素的原子增加或减少,函数接收两个参数,分别是需要修改的变量的地址和修改的差值,函数会直接在传递的地址上进行修改操作,此外函数会返回修改之后的新值
  • 减少操作的话 需要传入负值