Mutex

结构体

type Mutex struct {
	state int32 //表示互斥锁的状态,比如是否被锁定等。
	sema  uint32//表示互斥锁的状态,比如是否被锁定等。
}
const (
   mutexLocked = 1 << iota // 表示互斥锁的锁定状态
   mutexWoken // 表示从正常模式被从唤醒
   mutexStarving // 当前的互斥锁进入饥饿状态
   mutexWaiterShift = iota // 当前互斥锁上等待者的数量
)

Mutex.state是32位的整型变量,内部实现时把该变量分成四份,用于记录Mutex的四种状态。
在这里插入图片描述

  • Locked: 表示该Mutex是否已被锁定,0:没有锁定 1:已被锁定。
  • Woken: 表示是否有协程已被唤醒,0:没有协程唤醒1:已有协程唤醒,正在加锁过程中。
  • Starving:表示该Mutex是否处于饥饿状态,0:没有饥饿1:饥饿状态,说明有协程阻塞了超过1ms。
  • Waiter: 表示阻塞等待锁的协程个数,协程解锁时根据此值来判断是否需要释放信号量。

协程之间抢锁实际上是抢给Locked位赋值的权利,能给Locked位置1,就说明抢锁成功。抢不到的话就阻塞等待Mutex.sema信号量,一旦持有锁的协程解锁,等待的协程会依次被唤醒。

Mutex方法

// A Locker represents an object that can be locked and unlocked.
type Locker interface {
	Lock() //加锁方法
	Unlock()//解锁方法
}

简单加锁

在这里插入图片描述
加锁过程会去判断Locked标志位是否为0,如果是0则把Locked位置1,代表加锁成功。从上图可见,加锁成功后,只是Locked位置1,其他状态位没发生变化。

sync_runtime_canSpin(i int)

自旋条件如下:

  • 自旋的次数要在4次以内
  • CPU必须为多核
  • GOMAXPROCS>1
  • 当前机器上至少存在一个正在运行的处理器 P 并且处理的运行队列为空;
const active_spin     = 4
func sync_runtime_canSpin(i int) bool {
 if i >= active_spin || ncpu <= 1 || gomaxprocs <= int32(sched.npidle+sched.nmspinning)+1 {
  return false
 }
 if p := getg().m.p.ptr(); !runqempty(p) {
  return false
 }
 return true
}

runtime_doSpin

循环次数被设置为30次,自旋操作就是执行30次PAUSE指令,通过该指令占用CPU并消费CPU时间,进行忙等待;

这就是整个自旋操作的逻辑,这个就是为了优化 等待阻塞->唤醒->参与抢占锁这个过程不高效,所以使用自旋进行优化,在期望在这个过程中锁被释放。

const active_spin_cnt = 30
func sync_runtime_doSpin() {
 procyield(active_spin_cnt)
}
// asm_amd64.s
TEXT runtime·procyield(SB),NOSPLIT,$0-0
 MOVL cycles+0(FP), AX
again:
 PAUSE
 SUBL $1, AX
 JNZ again
 RET

lockSlow

func (m *Mutex) lockSlow() {
	var waitStartTime int64 //来计算waiter的等待时间
	starving := false //是饥饿模式标志,如果等待时长超过1ms,starving置为true,后续操作会把Mutex也标记为饥饿状态。
	awoke := false//表示协程是否唤醒
	iter := 0//用于记录协程的自旋次数
	old := m.state//记录当前锁的状态
	for {
		//判断是否允许进入自旋 两个条件:
		//1.old&(mutexLocked|mutexStarving) == mutexLocked ==》当前锁不能处于饥饿状态(用来判断锁是否处于正常模式且加锁)
		//mutexLocked 二进制表示为 0001
		//mutexStarving 二进制表示为 0100
		//mutexLocked|mutexStarving 二进制为 0101. 使用0101在当前状态做 &操作,如果当前处于饥饿模式,低三位一定会是1,如果当前处于加锁模式,低1位一定会是1,所以使用该方法就可以判断出当前锁是否处于正常模式且加锁;
		//2.runtime_canSpin(iter)==》其逻辑是在多核CPU运行,自旋的次数小于4
		if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
// !awoke 判断当前goroutine不是在唤醒状态
// old&mutexWoken == 0 表示没有其他正在唤醒的goroutine
// old>>mutexWaiterShift != 0 表示等待队列中有正在等待的goroutine
// atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) 尝试将当前锁的低2位的Woken状态位设置为1,表示已被唤醒, 这是为了通知在解锁Unlock()中不要再唤醒其他的waiter了
			if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
				atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
				awoke = true
			}
			//判断当前goroutine可以进自旋后,调用runtime_doSpin方法进行自旋:
			runtime_doSpin()
			iter++
			old = m.state
			continue
		}
//--------------------抢锁准备期望状态------------------------
//自旋逻辑处理好后开始根据上下文计算当前互斥锁最新的状态,根据不同的条件来计算mutexLocked、mutexStarving、mutexWoken 和 mutexWaiterShift:
//首先计算mutexLocked的值:
// 基于old状态声明到一个新状态
		new := old
// 新状态处于非饥饿的条件下才可以加锁
		if old&mutexStarving == 0 {
			new |= mutexLocked
		}
//计算mutexWaiterShift的值:
//如果old已经处于加锁或者饥饿状态,则等待者按照FIFO的顺序排队
		if old&(mutexLocked|mutexStarving) != 0 {
			new += 1 << mutexWaiterShift
		}
//计算mutexStarving的值:
 如果当前锁处于饥饿模式,并且已被加锁,则将低3位的Starving状态位设置为1,表示饥饿
		if starving && old&mutexLocked != 0 {
			new |= mutexStarving
		}
//计算mutexWoken的值:	
// 当前goroutine的waiter被唤醒,则重置flag	
		if awoke {
   // 唤醒状态不一致,直接抛出异常
			if new&mutexWoken == 0 {
				throw("sync: inconsistent mutex state")
			}
// 新状态清除唤醒标记,因为后面的goroutine只会阻塞或者抢锁成功
// 如果是挂起状态,那就需要等待其他释放锁的goroutine来唤醒。
// 假如其他goroutine在unlock的时候发现Woken的位置不是0,则就不会去唤醒,那该goroutine就无法在被唤醒后加锁
			new &^= mutexWoken
		}
//-----------------------通过CAS操作更新期望状态------------------------------------------
//这块的逻辑很复杂,通过CAS来判断是否获取到锁,没有通过 CAS 获得锁,会调用 runtime.sync_runtime_SemacquireMutex通过信号量保证资源不会被
//两个 goroutine 获取,runtime.sync_runtime_SemacquireMutex会在方法中不断尝试获取锁并陷入休眠等待信号量的释放,一旦当前 goroutine 可
//以获取信号量,它就会立刻返回,如果是新来的goroutine,就需要放在队尾;如果是被唤醒的等待锁的goroutine,就放在队头,整个过程还需要啃代码来加深理解。
//上面我们已经得到了锁的期望状态,接下来通过CAS将锁的状态进行更新:
		if atomic.CompareAndSwapInt32(&m.state, old, new) {
		  // 如果原来锁的状态是没有加锁的并且不处于饥饿状态,则表示当前goroutine已经获取到锁了,直接推出即可
			if old&(mutexLocked|mutexStarving) == 0 {
				break // locked the mutex with CAS
			}
   // 到这里就表示goroutine还没有获取到锁,waitStartTime是goroutine开始等待的时间,waitStartTime != 0就表示当前goroutine已经等待过了,则需要将其放置在等待队列队头,否则就排到队列队尾
   			queueLifo := waitStartTime != 0
			if waitStartTime == 0 {
				waitStartTime = runtime_nanotime()
			}
			// 阻塞等待
			runtime_SemacquireMutex(&m.sema, queueLifo, 1)
	 // 被信号量唤醒后检查当前goroutine是否应该表示为饥饿
     // 1. 当前goroutine已经饥饿
     // 2. goroutine已经等待了1ms以上
			starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
			 // 再次获取当前锁的状态
			old = m.state
			 // 如果当前处于饥饿模式,
			if old&mutexStarving != 0 {
// 如果当前锁既不是被获取也不是被唤醒状态,或者等待队列为空 这代表锁状态产生了不一致的问题
				if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
					throw("sync: inconsistent mutex state")
				}
				// 当前goroutine已经获取了锁,等待队列-1
				delta := int32(mutexLocked - 1<<mutexWaiterShift)
			// 当前goroutine非饥饿状态 或者 等待队列只剩下一个waiter,则退出饥饿模式(清除饥饿标识位)              
				if !starving || old>>mutexWaiterShift == 1 {
					delta -= mutexStarving
				}
				// 更新状态值并中止for循环,拿到锁退出
				atomic.AddInt32(&m.state, delta)
				break
			}
			// 设置当前goroutine为唤醒状态,且重置自璇次数
			awoke = true
			iter = 0
		} else {
		 // 锁被其他goroutine占用了,还原状态继续for循环
			old = m.state
		}
	}

	if race.Enabled {
		race.Acquire(unsafe.Pointer(m))
	}
}

加锁被阻塞

假定加锁时,锁已被其他协程占用了,此时加锁过程如下图所示:
在这里插入图片描述
从上图可看到,当协程B对一个已被占用的锁再次加锁时,Waiter计数器增加了1,此时协程B将被阻塞,直到Locked值变为0后才会被唤醒。

简单解锁

假定解锁时,没有其他协程阻塞,此时解锁过程如下图所示:
在这里插入图片描述
由于没有其他协程阻塞等待加锁,所以此时解锁时只需要把Locked位置为0即可,不需要释放信号量。

解锁并唤醒协程

假定解锁时,有1个或多个协程阻塞,此时解锁过程如下图所示:

在这里插入图片描述

Unlock

func (m *Mutex) Unlock() {
	if race.Enabled {
		_ = m.state
		race.Release(unsafe.Pointer(m))
	}

//使用AddInt32方法快速进行解锁,将m.state的低1位置为0,
	new := atomic.AddInt32(&m.state, -mutexLocked)
	//然后判断新的m.state值,如果值为0,则代表当前锁已经完全空闲了,结束解锁,不等于0说明当前锁没有被占用
	if new != 0 {
//会有等待的goroutine还未被唤醒,需要进行一系列唤醒操作,这部分逻辑就在unlockSlow方法内:
		m.unlockSlow(new)
	}
}

TryLock 非阻塞加锁

TryLock的实现就比较简单了,主要就是两个判断逻辑:

  • 判断当前锁的状态,如果锁处于加锁状态或饥饿状态直接获取锁失败
  • 尝试获取锁,获取失败直接获取锁失败
    TryLock并不被鼓励使用,至少我还没想到有什么场景可以使用到它。
func (m *Mutex) TryLock() bool {
// // 记录当前状态
	old := m.state
	  处于加锁状态/饥饿状态直接获取锁失败

	if old&(mutexLocked|mutexStarving) != 0 {
		return false
	}
	 // 尝试获取锁,获取失败直接获取失败
	if !atomic.CompareAndSwapInt32(&m.state, old, old|mutexLocked) {
		return false
	}

	if race.Enabled {
		race.Acquire(unsafe.Pointer(m))
	}
	return true
}

unlockSlow

我们在唤醒goroutine时正常模式/饥饿模式都调用func runtime_Semrelease(s *uint32, handoff bool, skipframes int),这两种模式在第二个参数的传参上不同,如果handoff is true, pass count directly to the first waiter.。

func (m *Mutex) unlockSlow(new int32) {
/// 这里表示解锁了一个没有上锁的锁,则直接发生panic
	if (new+mutexLocked)&mutexLocked == 0 {
		throw("sync: unlock of unlocked mutex")
	}
	 正常模式的释放锁逻辑
	if new&mutexStarving == 0 {
		old := new
		for {
	// 如果没有等待者则直接返回即可
      // 如果锁处于加锁的状态,表示已经有goroutine获取到了锁,可以返回
      // 如果锁处于唤醒状态,这表明有等待的goroutine被唤醒了,不用尝试获取其他goroutine了
      // 如果锁处于饥饿模式,锁之后会直接给等待队头goroutine
			if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
				return
			}
// 抢占唤醒标志位,这里是想要把锁的状态设置为被唤醒,然后waiter队列-1
			new = (old - 1<<mutexWaiterShift) | mutexWoken
			if atomic.CompareAndSwapInt32(&m.state, old, new) {
			// 抢占成功唤醒一个goroutine
				runtime_Semrelease(&m.sema, false, 1)
				return
			}
			 // 执行抢占不成功时重新更新一下状态信息,下次for循环继续处理
			old = m.state
		}
	} else {
// 饥饿模式释放锁逻辑,直接唤醒等待队列goroutine
		runtime_Semrelease(&m.sema, true, 1)
	}
}

协程A解锁过程分为两个步骤,一是把Locked位置0,二是查看到Waiter>0,所以释放一个信号量,唤醒一个阻塞的协程,被唤醒的协程B把Locked位置1,于是协程B获得锁。

自旋

什么是自旋

自旋对应于CPU的”PAUSE”指令,CPU对该指令什么都不做,相当于CPU空转,对程序而言相当于sleep了一小段时间,时间非常短,当前实现是30个时钟周期。
自旋过程中会持续探测Locked是否变为0,连续两次探测间隔就是执行这些PAUSE指令,它不同于sleep,不需要将协程转为睡眠状态。

自旋条件

加锁时程序会自动判断是否可以自旋,无限制的自旋将会给CPU带来巨大压力,所以判断是否可以自旋就很重要了。
自旋必须满足以下所有条件:

  • 自旋次数要足够小,通常为4,即自旋最多4次
  • CPU核数要大于1,否则自旋没有意义,因为此时不可能有其他协程释放锁
  • 协程调度机制中的Process数量要大于1,比如使用GOMAXPROCS()将处理器设置为1就不能启用自旋
  • 协程调度机制中的可运行队列必须为空,否则会延迟协程调度

可见,自旋的条件是很苛刻的,总而言之就是不忙的时候才会启用自旋。

自旋的优势

自旋的优势是更充分的利用CPU,尽量避免协程切换。因为当前申请加锁的协程拥有CPU,如果经过短时间的自旋可以获得锁,当前协程可以继续运行,不必进入阻塞状态。

自旋的问题

如果自旋过程中获得锁,那么之前被阻塞的协程将无法获得锁,如果加锁的协程特别多,每次都通过自旋获得锁,那么之前被阻塞的进程将很难获得锁,从而进入饥饿状态。
为了避免协程长时间无法获取锁,自1.8版本以来增加了一个状态,即Mutex的Starving状态。这个状态下不会自旋,一旦有协程释放锁,那么一定会唤醒一个协程并成功加锁。

Starving位

每个Mutex都有两个模式,称为Normal和Starving。

Normal

默认情况下,Mutex的模式为normal。

该模式下,协程如果加锁不成功不会立即转入阻塞排队,而是判断是否满足自旋的条件,如果满足则会启动自旋过程,尝试抢锁。

starvation模式

自旋过程中能抢到锁,一定意味着同一时刻有协程释放了锁,我们知道释放锁时如果发现有阻塞等待的协程,还会释放一个信号量来唤醒一个等待协程,被唤醒的协程得到CPU后开始运行,此时发现锁已被抢占了,自己只好再次阻塞,不过阻塞前会判断自上次阻塞到本次阻塞经过了多长时间,如果超过1ms的话,会将Mutex标记为”饥饿”模式,然后再阻塞。
但是这个时候的starvation模式,要在下一次相同情况时(有别的协程在自旋过程中刚好锁被释放),才会体现starvation模式的作用。即别的协程就算在自旋,也不能获得锁,只有去阻塞,因为当前处于Mutex饥饿状态。

处于饥饿模式下,不会启动自旋过程,也即一旦有协程释放了锁,那么一定会唤醒协程,被唤醒的协程将会成功获取锁,同时也会把等待计数减1。

为什么重复解锁要panic

可能你会想,为什么Go不能实现得更健壮些,多次执行Unlock()也不要panic?

仔细想想Unlock的逻辑就可以理解,这实际上很难做到。Unlock过程分为将Locked置为0,然后判断Waiter值,如果值>0,则释放信号量。

如果多次Unlock(),那么可能每次都释放一个信号量,这样会唤醒多个协程,多个协程唤醒后会继续在Lock()的逻辑里抢锁,势必会增加Lock()实现的复杂度,也会引起不必要的协程切换。

总结
  • 互斥锁有两种模式:正常模式、饥饿模式,饥饿模式的出现是为了优化正常模式下刚被唤起的goroutine与新创建的goroutine竞争时长时间获取不到锁,在Go1.9时引入饥饿模式,如果一个goroutine获取锁失败超过1ms,则会将Mutex切换为饥饿模式,如果一个goroutine获得了锁,并且他在等待队列队尾 或者 他等待小于1ms,则会将Mutex的模式切换回正常模式
  • 加锁的过程:
  1. 锁处于完全空闲状态,通过CAS直接加锁
  2. 当锁处于正常模式、加锁状态下,并且符合自旋条件,则会尝试最多4次的自旋
  3. 若当前goroutine不满足自旋条件时,计算当前goroutine的锁期望状态
  4. 尝试使用CAS更新锁状态,若更新锁状态成功判断当前goroutine是否可以获取到锁,获取到锁直接退出即可,若不同获取到锁子则陷入睡眠,等待被唤醒
  5. goroutine被唤醒后,如果锁处于饥饿模式,则直接拿到锁,否则重置自旋次数、标志唤醒位,重新走for循环自旋、获取锁逻辑;
  • 解锁的过程
  1. 原子操作mutexLocked,如果锁为完全空闲状态,直接解锁成功
  2. 如果锁不是完全空闲状态,,那么进入unlockedslow逻辑
  3. 如果解锁一个未上锁的锁直接panic,因为没加锁mutexLocked的值为0,解锁时进行mutexLocked - 1操作,这个操作会让整个互斥锁魂村,所以需要有这个判断
  4. 如果锁处于饥饿模式直接唤醒等待队列队头的waiter
  5. 如果锁处于正常模式下,没有等待的goroutine可以直接退出,如果锁已经处于锁定状态、唤醒状态、饥饿模式则可以直接退出,因为已经有被唤醒的 goroutine 获得了锁.
  • 使用互斥锁时切记拷贝Mutex,因为拷贝Mutex时会连带状态一起拷贝,因为Lock时只有锁在完全空闲时才会获取锁成功,拷贝时连带状态一起拷贝后,会造成死锁
  • TryLock的实现逻辑很简单,主要判断当前锁处于加锁状态、饥饿模式就会直接获取锁失败,尝试获取锁失败直接返回;
使用defer避免死锁 加锁和解锁应该成对出现 RWMutex
type RWMutex struct {
	w           Mutex  // held if there are pending writers
	writerSem   uint32 // semaphore for writers to wait for completing readers
	readerSem   uint32 // semaphore for readers to wait for completing writers
	readerCount int32  // number of pending readers
	readerWait  int32  // number of departing readers
}