1、互斥锁概念
- 对于同一份资源来说,为了保证 在多个线程或者协程同时访问的时候,保证数据的读取不会出现错误。
- 互斥锁相对于 读写锁而言更加绝对一些,互斥锁不论是读写,加过锁后,都是互斥的,需要等待
2、Mutex 数据结构
type Mutex struct {
state int32 // 互斥锁状态
sema uint32 // 信号量
}
- sema 表示信号量,用来控制 goroutine 的 阻塞休眠和唤醒。协程阻塞时等待该信号量,协程解锁是释放该信号量并唤醒等待信号量的协程
- state 表示 互斥锁的状态,0表示 未上锁,1表示上锁状态
3、state 状态组成
state 一共32位,最低三位依次是 Locked、Woken、Starving,剩下的其他位表示 等待锁的goroutine
- Waiter 表示等待锁释放的goroutine数量
- starving 表示当前锁释放处于饥饿状态,0 不饿 1 饥饿。饥饿状态说是有协程阻塞超过了1ms
- woken 表示唤醒状态,0 没唤醒,1 唤醒状态。唤醒状态表示正在加锁
- locked 表述是否锁定 0 未锁定 1 已锁定
协程之间抢占锁实际就是看谁能给locked 赋值。赋值为1说明抢占成功,抢不到的话就是0.
抢不到的话就等待 sema 的信号量,一旦有协程解锁,等待的协程就会被依次唤醒
woken 和 starving 主要用户控制协程之间的抢锁过程
4、Mutex 的方法
Mutex 对外仅暴露两个方法,lock 和 unlock。加红锁的是私有的方法
Lock
加锁过程
1、无阻塞加锁图解
当前锁是空闲状态,且没有协程抢占。
加锁前判断Locked 是否是 0 , 如果是0 的话 改为 1 ,加锁成功。
仅 Locked 状态位变为1 ,其他位置不变化
2、阻塞加锁图解
协程B加锁的时候,发现该锁已经被协程A占用了
此时 Waiter 计数器会增加1,协程B 被阻塞,知道协程A释放锁。Locked 变为0 ,协程B才会被唤醒
加锁代码
// Lock locks m.
// If the lock is already in use, the calling goroutine
// blocks until the mutex is available.
func (m *Mutex) Lock() {
// Fast path: grab unlocked mutex.
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return
}
// Slow path (outlined so that the fast path can be inlined)
m.lockSlow()
}
- 首先使用 CAS 方法(CompareAndSwapInt32)判断是否可以获取资源。如果可以获取资源的话,就直接修改 Locked 的值,如果获取不到的话就 执行 lockSlow() 方法,自旋抢锁,直到拿到锁
1、CompareAndSwapInt32 方法介绍
-
CAS 是原子操作的一种,是在包 sync.atomic 下面
-
原子操作即是进行过程中不能被中断的操作。针对某个值的原子操作在被进行的过程中,CPU绝不会再去进行其他的针对该值的操作。
为了实现这样的严谨性,原子操作仅会由一个独立的CPU指令代表和完成
-
对应的其他方法还有 增或减(add)、比较并交换(CompareAndSwap)、载入(Load)、存储(Store)、交换(Swap)
-
对应的操作类型 :int32,int64,uint32,uint64,uintptr,unsafe.Pointer
func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool)
2、lockSlow 方法
加锁失败后就会进入 lockSlow 方法,自旋抢锁
func (m *Mutex) lockSlow() {
var waitStartTime int64 // 协程等待时间
starving := false // 锁的模式
awoke := false // 循环标记
iter := 0 // 计数器
old := m.state // 当前锁的状态
for {
// 当old&0101 == 0001 等于 1 就是已经在锁定状态,第一位是1,第三位是0,未处于饥饿状态,开始自旋
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
// woken 未更新,也就是未唤醒,woken 仍然为0
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
// 将当前协程标识为 唤醒状态。执行自旋操作,计数器+1,当前状态更新到 OLD
runtime_doSpin()
iter++
old = m.state
continue
}
new := old
// 新来的协程,需要排队,第三位为0
if old&mutexStarving == 0 {
new |= mutexLocked
}
// 当 old 的 第1和3 位为1 时,为饥饿模式,需要排队
if old&(mutexLocked|mutexStarving) != 0 {
new += 1 << mutexWaiterShift
}
// The current goroutine switches mutex to starvation mode.
// But if the mutex is currently unlocked, don't do the switch.
// Unlock expects that starving mutex has waiters, which will not
// be true in this case.
// 切换到饥饿模式,解锁时不需要
if starving && old&mutexLocked != 0 {
new |= mutexStarving
}
// 唤醒
if awoke {
// The goroutine has been woken from sleep,
// so we need to reset the flag in either case.
// 互斥锁状态不同需要 panic
if new&mutexWoken == 0 {
throw("sync: inconsistent mutex state")
}
// 同时清除 唤醒状态位
new &^= mutexWoken
}
if atomic.CompareAndSwapInt32(&m.state, old, new) {
// 如果 old 的 第一位不是1,且处于饥饿状态,获取锁成功
if old&(mutexLocked|mutexStarving) == 0 {
break // locked the mutex with CAS
}
// If we were already waiting before, queue at the front of the queue.
// 唤醒后抢锁失败了,重新放到队列的首部
queueLifo := waitStartTime != 0
if waitStartTime == 0 {
waitStartTime = runtime_nanotime()
}
// 进入休眠状态,等到信号唤醒
runtime_SemacquireMutex(&m.sema, queueLifo, 1)
// 确认当前锁的状态
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state
if old&mutexStarving != 0 {
// If this goroutine was woken and mutex is in starvation mode,
// ownership was handed off to us but mutex is in somewhat
// inconsistent state: mutexLocked is not set and we are still
// accounted as waiter. Fix that.
// 饥饿模式不会出现mutex 被锁住,等待队列部位0
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
throw("sync: inconsistent mutex state")
}
// 拿到锁后,等待数-1
delta := int32(mutexLocked - 1<<mutexWaiterShift)
// 非饥饿模式,等待者只有一个时,退出饥饿模式
if !starving || old>>mutexWaiterShift == 1 {
// Exit starvation mode.
// Critical to do it here and consider wait time.
// Starvation mode is so inefficient, that two goroutines
// can go lock-step infinitely once they switch mutex
// to starvation mode.
delta -= mutexStarving
}
// 更新状态,高位原子计数,直接添加
atomic.AddInt32(&m.state, delta)
break
}
// awoke=true,不处于饥饿模式,新到达的协程先获得锁
awoke = true
iter = 0
} else {
// old = m.state,自旋没成功,更新new,记录当前的状态
old = m.state
}
}
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
}
自旋
1、概念
-
旋箱相当于 CPU 的 “PAUSE”,CPU对该指令什么都不做,相当于CPU空转,对程序而言相当于sleep了一小段时间,时间非常短,当前实现是30个时钟周期。
-
自旋过程中会持续探测Locked是否变为0,连续两次探测间隔就是执行这些PAUSE指令,它不同于sleep,不需要将协程转为睡眠状态。
2、自旋过程
-
加锁时,如果当前Locked位为1,说明该锁当前由其他协程持有,尝试加锁的协程并不是马上转入阻塞,而是会持续的探测Locked位是否变为0,这个过程即为自旋过程。
-
自旋时间很短,但如果在自旋过程中发现锁已被释放,那么协程可以立即获取锁。此时即便有协程被唤醒也无法获取锁,只能再次阻塞。
-
自旋的好处是,当加锁失败时不必立即转入阻塞,有一定机会获取到锁,这样可以避免协程的切换。
3、自旋的优势
- 自旋的优势是更充分的利用CPU,尽量避免协程切换。因为当前申请加锁的协程拥有CPU,如果经过短时间的自旋可以获得锁,当前协程可以继续运行,不必进入阻塞状态。
4、自旋的条件
-
加锁时程序会自动判断是否可以自旋,无限制的自旋将会给CPU带来巨大压力,所以判断是否可以自旋就很重要了。
自旋必须满足以下所有条件:
-
自旋次数要足够小,通常为4,即自旋最多4次
-CPU核数要大于1,否则自旋没有意义,因为此时不可能有其他协程释放锁 -
协程调度机制中的Process数量要大于1,比如使用GOMAXPROCS()将处理器设置为1就不能启用自旋
-
协程调度机制中的可运行队列必须为空,否则会延迟协程调度
-
可见,自旋的条件是很苛刻的,总而言之就是不忙的时候才会启用自旋。
5、加锁失败后进入自旋可能存在的问题
-
如果自旋过程中获得锁,那么之前被阻塞的协程将无法获得锁,如果加锁的协程特别多,每次都通过自旋获得锁,那么之前被阻塞的进程将很难获得锁,从而进入饥饿状态。
-
为了避免协程长时间无法获取锁,自1.8版本以来增加了一个状态,即Mutex的Starving状态。这个状态下不会自旋,一旦有协程释放锁,那么一定会唤醒一个协程并成功加锁。
unLock
解锁过程
1、无阻塞解锁
- 没有其他协程阻塞等待加锁时,此时直接把 Locked 位置改为0即可,不需要等待释放信号量
- 这种情况 仅改变 Locked 一个标志位
2、阻塞解锁
- 当解锁时观察到 Waiter 是大于0 的,说明有协程在等待锁的释放。
- 先将锁的 Locked 改为0,表示解锁。然后释放信号量,唤醒在等待的协程,被唤醒的协程进行加锁,将 Locked 改为1,表示再次加锁成功
- 这种阻塞的解锁时,需要更做两件事情,一个是修改Locked 另一个是释放信号量
解锁代码
1、 unLock
func (m *Mutex) Unlock() {
if race.Enabled {
_ = m.state
race.Release(unsafe.Pointer(m))
}
// Fast path: drop lock bit.
new := atomic.AddInt32(&m.state, -mutexLocked)
if new != 0 {
// Outlined slow path to allow inlining the fast path.
// To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock.
m.unlockSlow(new)
}
}
解锁时先更新 第一位锁标识 Locked 为0,如果更新失败,会进入方法 unlockSlow 进行解锁操作
2、unlockSlow
func (m *Mutex) unlockSlow(new int32) {
// 当锁没有上锁时,Locked 为 0 ,panic
if (new+mutexLocked)&mutexLocked == 0 {
throw("sync: unlock of unlocked mutex")
}
// 当处于饥饿模式下,直接唤起等待队列的首个协程
if new&mutexStarving == 0 {
old := new
for {
// If there are no waiters or a goroutine has already
// been woken or grabbed the lock, no need to wake anyone.
// In starvation mode ownership is directly handed off from unlocking
// goroutine to the next waiter. We are not part of this chain,
// since we did not observe mutexStarving when we unlocked the mutex above.
// So get off the way.
// 如果没有在等待的协程,直接返回
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
// Grab the right to wake someone.
// 等待者数量减一,将唤醒位改为1
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
runtime_Semrelease(&m.sema, false, 1)
return
}
old = m.state
}
} else {
// Starving mode: handoff mutex ownership to the next waiter, and yield
// our time slice so that the next waiter can start to run immediately.
// Note: mutexLocked is not set, the waiter will set it after wakeup.
// But mutex is still considered locked if mutexStarving is set,
// so new coming goroutines won't acquire it.
// 饥饿模式下,将持有锁交给下一个等待者,此时mutexLocked还为0,但是在饥饿模式下,新协程不会更新mutexLocked位
runtime_Semrelease(&m.sema, true, 1)
}
}
5、概念补充
5.1 normal 模式
- 默认情况下,Mutex的模式为normal
- 该模式下,协程如果加锁不成功不会立即转入阻塞排队,而是判断是否满足自旋的条件,如果满足则会启动自旋过程,尝试抢锁
5.2 starvation 模式
- 如果一个协程在等待解锁,结果,每次一释放锁就被 自旋 机制的协程抢走,再次进去阻塞。一直无法获取锁。
- 阻塞前会判断 本次阻塞经过了多长时间,如果超过 1ms,就会进入饥饿模式,然后再阻塞
- 在饥饿模式下,不会进入 自旋,就是一旦有协程释放了锁,就会唤醒等待队列中得协程,同时把 Waiter 数量减一
5.3 Woken 状态
- woken 用于解锁和加锁之间的通信。
- 当一个协程加锁失败进入自旋时,会将 Woken 置为1
- 此时如果正好有一个协程在解锁。就会通知解锁的协程,仅仅解锁就可以了,不必要释放信号量,我自己会通过自旋获取锁