结构体
type Mutex struct {
state int32 //表示互斥锁的状态,比如是否被锁定等。
sema uint32//表示互斥锁的状态,比如是否被锁定等。
}
const (
mutexLocked = 1 << iota // 表示互斥锁的锁定状态
mutexWoken // 表示从正常模式被从唤醒
mutexStarving // 当前的互斥锁进入饥饿状态
mutexWaiterShift = iota // 当前互斥锁上等待者的数量
)
Mutex.state是32位的整型变量,内部实现时把该变量分成四份,用于记录Mutex的四种状态。
- Locked: 表示该Mutex是否已被锁定,0:没有锁定 1:已被锁定。
- Woken: 表示是否有协程已被唤醒,0:没有协程唤醒1:已有协程唤醒,正在加锁过程中。
- Starving:表示该Mutex是否处于饥饿状态,0:没有饥饿1:饥饿状态,说明有协程阻塞了超过1ms。
- Waiter: 表示阻塞等待锁的协程个数,协程解锁时根据此值来判断是否需要释放信号量。
协程之间抢锁实际上是抢给Locked位赋值的权利,能给Locked位置1,就说明抢锁成功。抢不到的话就阻塞等待Mutex.sema信号量,一旦持有锁的协程解锁,等待的协程会依次被唤醒。
Mutex方法
// A Locker represents an object that can be locked and unlocked.
type Locker interface {
Lock() //加锁方法
Unlock()//解锁方法
}
简单加锁
加锁过程会去判断Locked标志位是否为0,如果是0则把Locked位置1,代表加锁成功。从上图可见,加锁成功后,只是Locked位置1,其他状态位没发生变化。
sync_runtime_canSpin(i int)
自旋条件如下:
- 自旋的次数要在4次以内
- CPU必须为多核
- GOMAXPROCS>1
- 当前机器上至少存在一个正在运行的处理器 P 并且处理的运行队列为空;
const active_spin = 4
func sync_runtime_canSpin(i int) bool {
if i >= active_spin || ncpu <= 1 || gomaxprocs <= int32(sched.npidle+sched.nmspinning)+1 {
return false
}
if p := getg().m.p.ptr(); !runqempty(p) {
return false
}
return true
}
runtime_doSpin
循环次数被设置为30次,自旋操作就是执行30次PAUSE指令,通过该指令占用CPU并消费CPU时间,进行忙等待;
这就是整个自旋操作的逻辑,这个就是为了优化 等待阻塞->唤醒->参与抢占锁这个过程不高效,所以使用自旋进行优化,在期望在这个过程中锁被释放。
const active_spin_cnt = 30
func sync_runtime_doSpin() {
procyield(active_spin_cnt)
}
// asm_amd64.s
TEXT runtime·procyield(SB),NOSPLIT,$0-0
MOVL cycles+0(FP), AX
again:
PAUSE
SUBL $1, AX
JNZ again
RET
lockSlow
func (m *Mutex) lockSlow() {
var waitStartTime int64 //来计算waiter的等待时间
starving := false //是饥饿模式标志,如果等待时长超过1ms,starving置为true,后续操作会把Mutex也标记为饥饿状态。
awoke := false//表示协程是否唤醒
iter := 0//用于记录协程的自旋次数
old := m.state//记录当前锁的状态
for {
//判断是否允许进入自旋 两个条件:
//1.old&(mutexLocked|mutexStarving) == mutexLocked ==》当前锁不能处于饥饿状态(用来判断锁是否处于正常模式且加锁)
//mutexLocked 二进制表示为 0001
//mutexStarving 二进制表示为 0100
//mutexLocked|mutexStarving 二进制为 0101. 使用0101在当前状态做 &操作,如果当前处于饥饿模式,低三位一定会是1,如果当前处于加锁模式,低1位一定会是1,所以使用该方法就可以判断出当前锁是否处于正常模式且加锁;
//2.runtime_canSpin(iter)==》其逻辑是在多核CPU运行,自旋的次数小于4
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
// !awoke 判断当前goroutine不是在唤醒状态
// old&mutexWoken == 0 表示没有其他正在唤醒的goroutine
// old>>mutexWaiterShift != 0 表示等待队列中有正在等待的goroutine
// atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) 尝试将当前锁的低2位的Woken状态位设置为1,表示已被唤醒, 这是为了通知在解锁Unlock()中不要再唤醒其他的waiter了
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
//判断当前goroutine可以进自旋后,调用runtime_doSpin方法进行自旋:
runtime_doSpin()
iter++
old = m.state
continue
}
//--------------------抢锁准备期望状态------------------------
//自旋逻辑处理好后开始根据上下文计算当前互斥锁最新的状态,根据不同的条件来计算mutexLocked、mutexStarving、mutexWoken 和 mutexWaiterShift:
//首先计算mutexLocked的值:
// 基于old状态声明到一个新状态
new := old
// 新状态处于非饥饿的条件下才可以加锁
if old&mutexStarving == 0 {
new |= mutexLocked
}
//计算mutexWaiterShift的值:
//如果old已经处于加锁或者饥饿状态,则等待者按照FIFO的顺序排队
if old&(mutexLocked|mutexStarving) != 0 {
new += 1 << mutexWaiterShift
}
//计算mutexStarving的值:
如果当前锁处于饥饿模式,并且已被加锁,则将低3位的Starving状态位设置为1,表示饥饿
if starving && old&mutexLocked != 0 {
new |= mutexStarving
}
//计算mutexWoken的值:
// 当前goroutine的waiter被唤醒,则重置flag
if awoke {
// 唤醒状态不一致,直接抛出异常
if new&mutexWoken == 0 {
throw("sync: inconsistent mutex state")
}
// 新状态清除唤醒标记,因为后面的goroutine只会阻塞或者抢锁成功
// 如果是挂起状态,那就需要等待其他释放锁的goroutine来唤醒。
// 假如其他goroutine在unlock的时候发现Woken的位置不是0,则就不会去唤醒,那该goroutine就无法在被唤醒后加锁
new &^= mutexWoken
}
//-----------------------通过CAS操作更新期望状态------------------------------------------
//这块的逻辑很复杂,通过CAS来判断是否获取到锁,没有通过 CAS 获得锁,会调用 runtime.sync_runtime_SemacquireMutex通过信号量保证资源不会被
//两个 goroutine 获取,runtime.sync_runtime_SemacquireMutex会在方法中不断尝试获取锁并陷入休眠等待信号量的释放,一旦当前 goroutine 可
//以获取信号量,它就会立刻返回,如果是新来的goroutine,就需要放在队尾;如果是被唤醒的等待锁的goroutine,就放在队头,整个过程还需要啃代码来加深理解。
//上面我们已经得到了锁的期望状态,接下来通过CAS将锁的状态进行更新:
if atomic.CompareAndSwapInt32(&m.state, old, new) {
// 如果原来锁的状态是没有加锁的并且不处于饥饿状态,则表示当前goroutine已经获取到锁了,直接推出即可
if old&(mutexLocked|mutexStarving) == 0 {
break // locked the mutex with CAS
}
// 到这里就表示goroutine还没有获取到锁,waitStartTime是goroutine开始等待的时间,waitStartTime != 0就表示当前goroutine已经等待过了,则需要将其放置在等待队列队头,否则就排到队列队尾
queueLifo := waitStartTime != 0
if waitStartTime == 0 {
waitStartTime = runtime_nanotime()
}
// 阻塞等待
runtime_SemacquireMutex(&m.sema, queueLifo, 1)
// 被信号量唤醒后检查当前goroutine是否应该表示为饥饿
// 1. 当前goroutine已经饥饿
// 2. goroutine已经等待了1ms以上
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
// 再次获取当前锁的状态
old = m.state
// 如果当前处于饥饿模式,
if old&mutexStarving != 0 {
// 如果当前锁既不是被获取也不是被唤醒状态,或者等待队列为空 这代表锁状态产生了不一致的问题
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
throw("sync: inconsistent mutex state")
}
// 当前goroutine已经获取了锁,等待队列-1
delta := int32(mutexLocked - 1<<mutexWaiterShift)
// 当前goroutine非饥饿状态 或者 等待队列只剩下一个waiter,则退出饥饿模式(清除饥饿标识位)
if !starving || old>>mutexWaiterShift == 1 {
delta -= mutexStarving
}
// 更新状态值并中止for循环,拿到锁退出
atomic.AddInt32(&m.state, delta)
break
}
// 设置当前goroutine为唤醒状态,且重置自璇次数
awoke = true
iter = 0
} else {
// 锁被其他goroutine占用了,还原状态继续for循环
old = m.state
}
}
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
}
加锁被阻塞
假定加锁时,锁已被其他协程占用了,此时加锁过程如下图所示:
从上图可看到,当协程B对一个已被占用的锁再次加锁时,Waiter计数器增加了1,此时协程B将被阻塞,直到Locked值变为0后才会被唤醒。
简单解锁
假定解锁时,没有其他协程阻塞,此时解锁过程如下图所示:
由于没有其他协程阻塞等待加锁,所以此时解锁时只需要把Locked位置为0即可,不需要释放信号量。
解锁并唤醒协程
假定解锁时,有1个或多个协程阻塞,此时解锁过程如下图所示:
Unlock
func (m *Mutex) Unlock() {
if race.Enabled {
_ = m.state
race.Release(unsafe.Pointer(m))
}
//使用AddInt32方法快速进行解锁,将m.state的低1位置为0,
new := atomic.AddInt32(&m.state, -mutexLocked)
//然后判断新的m.state值,如果值为0,则代表当前锁已经完全空闲了,结束解锁,不等于0说明当前锁没有被占用
if new != 0 {
//会有等待的goroutine还未被唤醒,需要进行一系列唤醒操作,这部分逻辑就在unlockSlow方法内:
m.unlockSlow(new)
}
}
TryLock 非阻塞加锁
TryLock的实现就比较简单了,主要就是两个判断逻辑:
- 判断当前锁的状态,如果锁处于加锁状态或饥饿状态直接获取锁失败
- 尝试获取锁,获取失败直接获取锁失败
TryLock并不被鼓励使用,至少我还没想到有什么场景可以使用到它。
func (m *Mutex) TryLock() bool {
// // 记录当前状态
old := m.state
处于加锁状态/饥饿状态直接获取锁失败
if old&(mutexLocked|mutexStarving) != 0 {
return false
}
// 尝试获取锁,获取失败直接获取失败
if !atomic.CompareAndSwapInt32(&m.state, old, old|mutexLocked) {
return false
}
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return true
}
unlockSlow
我们在唤醒goroutine时正常模式/饥饿模式都调用func runtime_Semrelease(s *uint32, handoff bool, skipframes int),这两种模式在第二个参数的传参上不同,如果handoff is true, pass count directly to the first waiter.。
func (m *Mutex) unlockSlow(new int32) {
/// 这里表示解锁了一个没有上锁的锁,则直接发生panic
if (new+mutexLocked)&mutexLocked == 0 {
throw("sync: unlock of unlocked mutex")
}
正常模式的释放锁逻辑
if new&mutexStarving == 0 {
old := new
for {
// 如果没有等待者则直接返回即可
// 如果锁处于加锁的状态,表示已经有goroutine获取到了锁,可以返回
// 如果锁处于唤醒状态,这表明有等待的goroutine被唤醒了,不用尝试获取其他goroutine了
// 如果锁处于饥饿模式,锁之后会直接给等待队头goroutine
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
// 抢占唤醒标志位,这里是想要把锁的状态设置为被唤醒,然后waiter队列-1
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
// 抢占成功唤醒一个goroutine
runtime_Semrelease(&m.sema, false, 1)
return
}
// 执行抢占不成功时重新更新一下状态信息,下次for循环继续处理
old = m.state
}
} else {
// 饥饿模式释放锁逻辑,直接唤醒等待队列goroutine
runtime_Semrelease(&m.sema, true, 1)
}
}
协程A解锁过程分为两个步骤,一是把Locked位置0,二是查看到Waiter>0,所以释放一个信号量,唤醒一个阻塞的协程,被唤醒的协程B把Locked位置1,于是协程B获得锁。
自旋
什么是自旋
自旋对应于CPU的”PAUSE”指令,CPU对该指令什么都不做,相当于CPU空转,对程序而言相当于sleep了一小段时间,时间非常短,当前实现是30个时钟周期。
自旋过程中会持续探测Locked是否变为0,连续两次探测间隔就是执行这些PAUSE指令,它不同于sleep,不需要将协程转为睡眠状态。
自旋条件
加锁时程序会自动判断是否可以自旋,无限制的自旋将会给CPU带来巨大压力,所以判断是否可以自旋就很重要了。
自旋必须满足以下所有条件:
- 自旋次数要足够小,通常为4,即自旋最多4次
- CPU核数要大于1,否则自旋没有意义,因为此时不可能有其他协程释放锁
- 协程调度机制中的Process数量要大于1,比如使用GOMAXPROCS()将处理器设置为1就不能启用自旋
- 协程调度机制中的可运行队列必须为空,否则会延迟协程调度
可见,自旋的条件是很苛刻的,总而言之就是不忙的时候才会启用自旋。
自旋的优势
自旋的优势是更充分的利用CPU,尽量避免协程切换。因为当前申请加锁的协程拥有CPU,如果经过短时间的自旋可以获得锁,当前协程可以继续运行,不必进入阻塞状态。
自旋的问题
如果自旋过程中获得锁,那么之前被阻塞的协程将无法获得锁,如果加锁的协程特别多,每次都通过自旋获得锁,那么之前被阻塞的进程将很难获得锁,从而进入饥饿状态。
为了避免协程长时间无法获取锁,自1.8版本以来增加了一个状态,即Mutex的Starving状态。这个状态下不会自旋,一旦有协程释放锁,那么一定会唤醒一个协程并成功加锁。
Starving位
每个Mutex都有两个模式,称为Normal和Starving。
Normal
默认情况下,Mutex的模式为normal。
该模式下,协程如果加锁不成功不会立即转入阻塞排队,而是判断是否满足自旋的条件,如果满足则会启动自旋过程,尝试抢锁。
starvation模式
自旋过程中能抢到锁,一定意味着同一时刻有协程释放了锁,我们知道释放锁时如果发现有阻塞等待的协程,还会释放一个信号量来唤醒一个等待协程,被唤醒的协程得到CPU后开始运行,此时发现锁已被抢占了,自己只好再次阻塞,不过阻塞前会判断自上次阻塞到本次阻塞经过了多长时间,如果超过1ms的话,会将Mutex标记为”饥饿”模式,然后再阻塞。
但是这个时候的starvation模式,要在下一次相同情况时(有别的协程在自旋过程中刚好锁被释放),才会体现starvation模式的作用。即别的协程就算在自旋,也不能获得锁,只有去阻塞,因为当前处于Mutex饥饿状态。
处于饥饿模式下,不会启动自旋过程,也即一旦有协程释放了锁,那么一定会唤醒协程,被唤醒的协程将会成功获取锁,同时也会把等待计数减1。
为什么重复解锁要panic
可能你会想,为什么Go不能实现得更健壮些,多次执行Unlock()也不要panic?
仔细想想Unlock的逻辑就可以理解,这实际上很难做到。Unlock过程分为将Locked置为0,然后判断Waiter值,如果值>0,则释放信号量。
如果多次Unlock(),那么可能每次都释放一个信号量,这样会唤醒多个协程,多个协程唤醒后会继续在Lock()的逻辑里抢锁,势必会增加Lock()实现的复杂度,也会引起不必要的协程切换。
总结- 互斥锁有两种模式:正常模式、饥饿模式,饥饿模式的出现是为了优化正常模式下刚被唤起的goroutine与新创建的goroutine竞争时长时间获取不到锁,在Go1.9时引入饥饿模式,如果一个goroutine获取锁失败超过1ms,则会将Mutex切换为饥饿模式,如果一个goroutine获得了锁,并且他在等待队列队尾 或者 他等待小于1ms,则会将Mutex的模式切换回正常模式
- 加锁的过程:
- 锁处于完全空闲状态,通过CAS直接加锁
- 当锁处于正常模式、加锁状态下,并且符合自旋条件,则会尝试最多4次的自旋
- 若当前goroutine不满足自旋条件时,计算当前goroutine的锁期望状态
- 尝试使用CAS更新锁状态,若更新锁状态成功判断当前goroutine是否可以获取到锁,获取到锁直接退出即可,若不同获取到锁子则陷入睡眠,等待被唤醒
- goroutine被唤醒后,如果锁处于饥饿模式,则直接拿到锁,否则重置自旋次数、标志唤醒位,重新走for循环自旋、获取锁逻辑;
- 解锁的过程
- 原子操作mutexLocked,如果锁为完全空闲状态,直接解锁成功
- 如果锁不是完全空闲状态,,那么进入unlockedslow逻辑
- 如果解锁一个未上锁的锁直接panic,因为没加锁mutexLocked的值为0,解锁时进行mutexLocked - 1操作,这个操作会让整个互斥锁魂村,所以需要有这个判断
- 如果锁处于饥饿模式直接唤醒等待队列队头的waiter
- 如果锁处于正常模式下,没有等待的goroutine可以直接退出,如果锁已经处于锁定状态、唤醒状态、饥饿模式则可以直接退出,因为已经有被唤醒的 goroutine 获得了锁.
- 使用互斥锁时切记拷贝Mutex,因为拷贝Mutex时会连带状态一起拷贝,因为Lock时只有锁在完全空闲时才会获取锁成功,拷贝时连带状态一起拷贝后,会造成死锁
- TryLock的实现逻辑很简单,主要判断当前锁处于加锁状态、饥饿模式就会直接获取锁失败,尝试获取锁失败直接返回;
type RWMutex struct {
w Mutex // held if there are pending writers
writerSem uint32 // semaphore for writers to wait for completing readers
readerSem uint32 // semaphore for readers to wait for completing writers
readerCount int32 // number of pending readers
readerWait int32 // number of departing readers
}