目录
- 引言
- Mutex结构
- 饥饿模式和正常模式
- 正常模式
- 饥饿模式
- 状态的切换
- 加锁和解锁
- 加锁
- 自旋
- 计算锁的新状态
- 更新锁状态
- 解锁
- 可能遇到的问题
- 锁拷贝
- panic导致没有unlock
引言
Golang的并发编程令人着迷,使用轻量的协程、基于CSP的channel、简单的go func()就可以开始并发编程,在并发编程中,往往离不开锁的概念。
sync.Mutex
Mutex结构
sync/mutex.go
type Mutex struct { state int32 sema uint32 }
statesema
互斥锁的状态定义在常量中:
const ( mutexLocked = 1 << iota // 1 ,处于锁定状态; 2^0 mutexWoken // 2 ;从正常模式被从唤醒; 2^1 mutexStarving // 4 ;处于饥饿状态; 2^2 mutexWaiterShift = iota // 3 ;获得互斥锁上等待的Goroutine个数需要左移的位数: 1 << mutexWaiterShift starvationThresholdNs = 1e6 // 锁进入饥饿状态的等待时间 )
0即其他状态。
sema
一个mutex对象仅占用8个字节,让人不禁感叹其设计的巧妙
饥饿模式和正常模式
正常模式
在正常模式下,等待的协程会按照先进先出的顺序得到锁 在正常模式下,刚被唤醒的goroutine与新创建的goroutine竞争时,大概率无法获得锁。
饥饿模式
为了避免正常模式下,goroutine被“饿死”的情况,go在1.19版本引入了饥饿模式,保证了Mutex的公平性
在饥饿模式中,互斥锁会直接交给等待队列最前面的goroutine。新的goroutine 在该状态下不能获取锁、也不会进入自旋状态,它们只会在队列的末尾等待。
状态的切换
在正常模式下,一旦Goroutine超过1ms没有获取到锁,它就会将当前互斥锁切换饥饿模式
如果一个goroutine 获得了互斥锁并且它在队列的末尾或者它等待的时间少于 1ms,那么当前的互斥锁就会切换回正常模式。
加锁和解锁
加锁
func (m *Mutex) Lock() { // Fast path: grab unlocked mutex. if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) { return } // 原注释: Slow path (outlined so that the fast path can be inlined) // 将 m.lockSlow() }
可以看到,当前互斥锁的状态为0时,尝试将当前锁状态设置为更新锁定状态,且这些操作是原子的。
lockSlow
var waitStartTime int64 starving := false // awoke := false iter := 0 old := m.state
随后进入一个很大的for循环,让我们来逐步分析
自旋
for { // 1 && 2 if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) { // 3. if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 && atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) { awoke = true } runtime_doSpin() iter++ old = m.state continue }
old&(mutexLocked|mutexStarving) == mutexLocked
当且仅当当前锁状态为mutexLocked时,表达式为true
runtime_canSpin(iter)
- 运行在拥有多个CPU的机器上;
- 当前Goroutine为了获取该锁进入自旋的次数小于四次;
- 当前机器上至少存在一个正在运行的处理器 P,并且处理的运行队列为空;
awokemutexWoken
runtime_doSpin()PAUSE
//go:linkname sync_runtime_doSpin sync.runtime_doSpin //go:nosplit func sync_runtime_doSpin() { procyield(active_spin_cnt) }
TEXT runtime·procyield(SB),NOSPLIT,$0-0 MOVL cycles+0(FP), AX again: PAUSE SUBL $1, AX JNZ again RET
计算锁的新状态
停止了自旋后,
new := old // 1. if old&mutexStarving == 0 { new |= mutexLocked } // 2. if old&(mutexLocked|mutexStarving) != 0 { new += 1 << mutexWaiterShift } // 3 && 4. if starving && old&mutexLocked != 0 { new |= mutexStarving } // 5. if awoke { if new&mutexWoken == 0 { throw("sync: inconsistent mutex state") } new &^= mutexWoken }
old&mutexStarving == 0mutexLockedmutexStarvingmutexLocked被唤醒过且抢锁失败
更新锁状态
// 1. if atomic.CompareAndSwapInt32(&m.state, old, new) { if old&(mutexLocked|mutexStarving) == 0 { break // locked the mutex with CAS } // 2. queueLifo := waitStartTime != 0 if waitStartTime == 0 { waitStartTime = runtime_nanotime() } // 3. runtime_SemacquireMutex(&m.sema, queueLifo, 1) // 4. starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs old = m.state // 5. if old&mutexStarving != 0 { / if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 { throw("sync: inconsistent mutex state") } delta := int32(mutexLocked - 1<<mutexWaiterShift) if !starving || old>>mutexWaiterShift == 1 { delta -= mutexStarving } atomic.AddInt32(&m.state, delta) break } awoke = true iter = 0 } else { old = m.state } }
mutexLockedwaitStartTimeruntime_SemacquireMutexruntime_SemacquireMutexruntime_SemacquireMutexstarvingstarvingstarvationThresholdNsstarving
之后会按照饥饿模式与正常模式,走不同的逻辑
- - 在正常模式下,这段代码会设置唤醒和饥饿标记、重置迭代次数并重新执行获取锁的循环;
- - 在饥饿模式下,当前 Goroutine 会获得互斥锁,如果等待队列中只存在当前 Goroutine,互斥锁还会从饥饿模式中退出;
解锁
func (m *Mutex) Unlock() { // 1. new := atomic.AddInt32(&m.state, -mutexLocked) if new != 0 { // 2. m.unlockSlow(new) } }
unlockSlow
func (m *Mutex) unlockSlow(new int32) { // 1. if (new+mutexLocked)&mutexLocked == 0 { throw("sync: unlock of unlocked mutex") } if new&mutexStarving == 0 { old := new for { // 2. if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 { return } // 2.1. new = (old - 1<<mutexWaiterShift) | mutexWoken if atomic.CompareAndSwapInt32(&m.state, old, new) { // 2.2. runtime_Semrelease(&m.sema, false, 1) return } old = m.state } } else { // 3. runtime_Semrelease(&m.sema, true, 1) } }
new+mutexLocked
2. 如果等待者数量等于0,或者锁的状态已经变为mutexWoken、mutexStarving、mutexStarving,则直接返回
- 将waiterCount数量-1,尝试选择一个goroutine唤醒
- 尝试更新锁状态,如果更新锁状态成功,则唤醒队尾的一个gorountine
2
可能遇到的问题
锁拷贝
mu1 := &sync.Mutex{} mu1.Lock() mu2 := mu1 mu2.Unlock()
mu2mu1
mu1 := &sync.Mutex{} mu1.Lock() mu2 := mu1 mu2.Unlock() mu1.Unlock()
可以看到发生了error
panic导致没有unlock
当lock()之后,可能由于代码问题导致程序发生了panic,那么mutex无法被及时unlock(),由于其他协程还在等待锁,此时可能触发死锁
func TestWithLock() { nums := 100 wg := &sync.WaitGroup{} safeSlice := SafeSlice{ s: []int{}, lock: new(sync.RWMutex), } i := 0 for idx := 0; idx < nums; idx++ { // 并行nums个协程做append wg.Add(1) go func() { defer func() { if r := recover(); r != nil { log.Println("recover") } wg.Done() }() safeSlice.lock.Lock() safeSlice.s = append(safeSlice.s, i) if i == 98{ panic("123") } i++ safeSlice.lock.Unlock() }() } wg.Wait() log.Println(len(safeSlice.s)) }
修改:
func TestWithLock() { nums := 100 wg := &sync.WaitGroup{} safeSlice := SafeSlice{ s: []int{}, lock: new(sync.RWMutex), } i := 0 for idx := 0; idx < nums; idx++ { // 并行nums个协程做append wg.Add(1) go func() { defer func() { if r := recover(); r != nil { } safeSlice.lock.Unlock() wg.Done() }() safeSlice.lock.Lock() safeSlice.s = append(safeSlice.s, i) if i == 98{ panic("123") } i++ }() } wg.Wait() log.Println(len(safeSlice.s)) }