互斥锁

1、互斥锁概念

  • 对于同一份资源来说,为了保证 在多个线程或者协程同时访问的时候,保证数据的读取不会出现错误。
  • 互斥锁相对于 读写锁而言更加绝对一些,互斥锁不论是读写,加过锁后,都是互斥的,需要等待

2、Mutex 数据结构

type Mutex struct {
	state int32		// 互斥锁状态
	sema  uint32	// 信号量
}
  • sema 表示信号量,用来控制 goroutine 的 阻塞休眠和唤醒。协程阻塞时等待该信号量,协程解锁是释放该信号量并唤醒等待信号量的协程
  • state 表示 互斥锁的状态,0表示 未上锁,1表示上锁状态

3、state 状态组成

state 一共32位,最低三位依次是 Locked、Woken、Starving,剩下的其他位表示 等待锁的goroutine

在这里插入图片描述

  • Waiter 表示等待锁释放的goroutine数量
  • starving 表示当前锁释放处于饥饿状态,0 不饿 1 饥饿。饥饿状态说是有协程阻塞超过了1ms
  • woken 表示唤醒状态,0 没唤醒,1 唤醒状态。唤醒状态表示正在加锁
  • locked 表述是否锁定 0 未锁定 1 已锁定

协程之间抢占锁实际就是看谁能给locked 赋值。赋值为1说明抢占成功,抢不到的话就是0.
抢不到的话就等待 sema 的信号量,一旦有协程解锁,等待的协程就会被依次唤醒
woken 和 starving 主要用户控制协程之间的抢锁过程

4、Mutex 的方法

在这里插入图片描述

Mutex 对外仅暴露两个方法,lock 和 unlock。加红锁的是私有的方法

Lock

加锁过程

1、无阻塞加锁图解

在这里插入图片描述

当前锁是空闲状态,且没有协程抢占。
加锁前判断Locked 是否是 0 , 如果是0 的话 改为 1 ,加锁成功。
仅 Locked 状态位变为1 ,其他位置不变化

2、阻塞加锁图解

在这里插入图片描述

协程B加锁的时候,发现该锁已经被协程A占用了
此时 Waiter 计数器会增加1,协程B 被阻塞,知道协程A释放锁。Locked 变为0 ,协程B才会被唤醒

加锁代码

// Lock locks m.
// If the lock is already in use, the calling goroutine
// blocks until the mutex is available.
func (m *Mutex) Lock() {
	// Fast path: grab unlocked mutex.
	if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
		if race.Enabled {
			race.Acquire(unsafe.Pointer(m))
		}
		return
	}
	// Slow path (outlined so that the fast path can be inlined)
	m.lockSlow()
}
  • 首先使用 CAS 方法(CompareAndSwapInt32)判断是否可以获取资源。如果可以获取资源的话,就直接修改 Locked 的值,如果获取不到的话就 执行 lockSlow() 方法,自旋抢锁,直到拿到锁
1、CompareAndSwapInt32 方法介绍
  • CAS 是原子操作的一种,是在包 sync.atomic 下面

  • 原子操作即是进行过程中不能被中断的操作。针对某个值的原子操作在被进行的过程中,CPU绝不会再去进行其他的针对该值的操作。
    为了实现这样的严谨性,原子操作仅会由一个独立的CPU指令代表和完成
    在这里插入图片描述

  • 对应的其他方法还有 增或减(add)、比较并交换(CompareAndSwap)、载入(Load)、存储(Store)、交换(Swap)

  • 对应的操作类型 :int32,int64,uint32,uint64,uintptr,unsafe.Pointer

func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool)
2、lockSlow 方法

加锁失败后就会进入 lockSlow 方法,自旋抢锁

func (m *Mutex) lockSlow() {
	var waitStartTime int64 	// 协程等待时间
	starving := false 			// 锁的模式
	awoke := false				// 循环标记
	iter := 0					// 计数器
	old := m.state				// 当前锁的状态
	for {
		// 当old&0101 == 0001 等于 1 就是已经在锁定状态,第一位是1,第三位是0,未处于饥饿状态,开始自旋
		if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
			// woken 未更新,也就是未唤醒,woken 仍然为0
			if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
				atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
				awoke = true
			}
			// 将当前协程标识为 唤醒状态。执行自旋操作,计数器+1,当前状态更新到 OLD
			runtime_doSpin()
			iter++
			old = m.state
			continue
		}
		new := old
		// 新来的协程,需要排队,第三位为0
		if old&mutexStarving == 0 {
			new |= mutexLocked
		}
		// 当 old 的 第1和3 位为1 时,为饥饿模式,需要排队
		if old&(mutexLocked|mutexStarving) != 0 {
			new += 1 << mutexWaiterShift
		}
		// The current goroutine switches mutex to starvation mode.
		// But if the mutex is currently unlocked, don't do the switch.
		// Unlock expects that starving mutex has waiters, which will not
		// be true in this case.
		// 切换到饥饿模式,解锁时不需要
		if starving && old&mutexLocked != 0 {
			new |= mutexStarving
		}
		// 唤醒
		if awoke {
			// The goroutine has been woken from sleep,
			// so we need to reset the flag in either case.
			// 互斥锁状态不同需要 panic
			if new&mutexWoken == 0 {
				throw("sync: inconsistent mutex state")
			}
			// 同时清除 唤醒状态位
			new &^= mutexWoken
		}
		if atomic.CompareAndSwapInt32(&m.state, old, new) {
		// 如果 old 的 第一位不是1,且处于饥饿状态,获取锁成功
			if old&(mutexLocked|mutexStarving) == 0 {
				break // locked the mutex with CAS
			}
			// If we were already waiting before, queue at the front of the queue.
			// 唤醒后抢锁失败了,重新放到队列的首部
			queueLifo := waitStartTime != 0
			if waitStartTime == 0 {
				waitStartTime = runtime_nanotime()
			}
			// 进入休眠状态,等到信号唤醒
			runtime_SemacquireMutex(&m.sema, queueLifo, 1)
			// 确认当前锁的状态
			starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
			old = m.state
			if old&mutexStarving != 0 {
				// If this goroutine was woken and mutex is in starvation mode,
				// ownership was handed off to us but mutex is in somewhat
				// inconsistent state: mutexLocked is not set and we are still
				// accounted as waiter. Fix that.
				// 饥饿模式不会出现mutex 被锁住,等待队列部位0
				if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
					throw("sync: inconsistent mutex state")
				}
				// 拿到锁后,等待数-1
				delta := int32(mutexLocked - 1<<mutexWaiterShift)
				// 非饥饿模式,等待者只有一个时,退出饥饿模式
				if !starving || old>>mutexWaiterShift == 1 {
					// Exit starvation mode.
					// Critical to do it here and consider wait time.
					// Starvation mode is so inefficient, that two goroutines
					// can go lock-step infinitely once they switch mutex
					// to starvation mode.
					delta -= mutexStarving
				}
				// 更新状态,高位原子计数,直接添加
				atomic.AddInt32(&m.state, delta)
				break
			}
			// awoke=true,不处于饥饿模式,新到达的协程先获得锁
			awoke = true
			iter = 0
		} else {
		// old = m.state,自旋没成功,更新new,记录当前的状态
			old = m.state
		}
	}

	if race.Enabled {
		race.Acquire(unsafe.Pointer(m))
	}
}

自旋

1、概念
  • 旋箱相当于 CPU 的 “PAUSE”,CPU对该指令什么都不做,相当于CPU空转,对程序而言相当于sleep了一小段时间,时间非常短,当前实现是30个时钟周期

  • 自旋过程中会持续探测Locked是否变为0,连续两次探测间隔就是执行这些PAUSE指令,它不同于sleep,不需要将协程转为睡眠状态

2、自旋过程
  • 加锁时,如果当前Locked位为1,说明该锁当前由其他协程持有,尝试加锁的协程并不是马上转入阻塞,而是会持续的探测Locked位是否变为0,这个过程即为自旋过程。

  • 自旋时间很短,但如果在自旋过程中发现锁已被释放,那么协程可以立即获取锁。此时即便有协程被唤醒也无法获取锁,只能再次阻塞。

  • 自旋的好处是,当加锁失败时不必立即转入阻塞,有一定机会获取到锁,这样可以避免协程的切换。

3、自旋的优势
  • 自旋的优势是更充分的利用CPU,尽量避免协程切换。因为当前申请加锁的协程拥有CPU,如果经过短时间的自旋可以获得锁,当前协程可以继续运行,不必进入阻塞状态。
4、自旋的条件
  • 加锁时程序会自动判断是否可以自旋,无限制的自旋将会给CPU带来巨大压力,所以判断是否可以自旋就很重要了。

    自旋必须满足以下所有条件:

  • 自旋次数要足够小,通常为4,即自旋最多4次
    -CPU核数要大于1,否则自旋没有意义,因为此时不可能有其他协程释放锁

  • 协程调度机制中的Process数量要大于1,比如使用GOMAXPROCS()将处理器设置为1就不能启用自旋

  • 协程调度机制中的可运行队列必须为空,否则会延迟协程调度

  • 可见,自旋的条件是很苛刻的,总而言之就是不忙的时候才会启用自旋。

5、加锁失败后进入自旋可能存在的问题
  • 如果自旋过程中获得锁,那么之前被阻塞的协程将无法获得锁,如果加锁的协程特别多,每次都通过自旋获得锁,那么之前被阻塞的进程将很难获得锁,从而进入饥饿状态。

  • 为了避免协程长时间无法获取锁,自1.8版本以来增加了一个状态,即Mutex的Starving状态。这个状态下不会自旋,一旦有协程释放锁,那么一定会唤醒一个协程并成功加锁。

unLock

解锁过程

1、无阻塞解锁

在这里插入图片描述

  • 没有其他协程阻塞等待加锁时,此时直接把 Locked 位置改为0即可,不需要等待释放信号量
  • 这种情况 仅改变 Locked 一个标志位
2、阻塞解锁

在这里插入图片描述

  • 当解锁时观察到 Waiter 是大于0 的,说明有协程在等待锁的释放。
  • 先将锁的 Locked 改为0,表示解锁。然后释放信号量,唤醒在等待的协程,被唤醒的协程进行加锁,将 Locked 改为1,表示再次加锁成功
  • 这种阻塞的解锁时,需要更做两件事情,一个是修改Locked 另一个是释放信号量

解锁代码

1、 unLock
func (m *Mutex) Unlock() {
	if race.Enabled {
		_ = m.state
		race.Release(unsafe.Pointer(m))
	}

	// Fast path: drop lock bit.
	new := atomic.AddInt32(&m.state, -mutexLocked)
	if new != 0 {
		// Outlined slow path to allow inlining the fast path.
		// To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock.
		m.unlockSlow(new)
	}
}

解锁时先更新 第一位锁标识 Locked 为0,如果更新失败,会进入方法 unlockSlow 进行解锁操作

2、unlockSlow
func (m *Mutex) unlockSlow(new int32) {
// 当锁没有上锁时,Locked 为 0 ,panic
	if (new+mutexLocked)&mutexLocked == 0 {
		throw("sync: unlock of unlocked mutex")
	}
	// 当处于饥饿模式下,直接唤起等待队列的首个协程
	if new&mutexStarving == 0 {
		old := new
		for {
			// If there are no waiters or a goroutine has already
			// been woken or grabbed the lock, no need to wake anyone.
			// In starvation mode ownership is directly handed off from unlocking
			// goroutine to the next waiter. We are not part of this chain,
			// since we did not observe mutexStarving when we unlocked the mutex above.
			// So get off the way.
			// 如果没有在等待的协程,直接返回
			if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
				return
			}
			// Grab the right to wake someone.
			// 等待者数量减一,将唤醒位改为1
			new = (old - 1<<mutexWaiterShift) | mutexWoken
			if atomic.CompareAndSwapInt32(&m.state, old, new) {
				runtime_Semrelease(&m.sema, false, 1)
				return
			}
			old = m.state
		}
	} else {
		// Starving mode: handoff mutex ownership to the next waiter, and yield
		// our time slice so that the next waiter can start to run immediately.
		// Note: mutexLocked is not set, the waiter will set it after wakeup.
		// But mutex is still considered locked if mutexStarving is set,
		// so new coming goroutines won't acquire it.
		// 饥饿模式下,将持有锁交给下一个等待者,此时mutexLocked还为0,但是在饥饿模式下,新协程不会更新mutexLocked位
		runtime_Semrelease(&m.sema, true, 1)
	}
}

5、概念补充

5.1 normal 模式

  • 默认情况下,Mutex的模式为normal
  • 该模式下,协程如果加锁不成功不会立即转入阻塞排队,而是判断是否满足自旋的条件,如果满足则会启动自旋过程,尝试抢锁

5.2 starvation 模式

  • 如果一个协程在等待解锁,结果,每次一释放锁就被 自旋 机制的协程抢走,再次进去阻塞。一直无法获取锁。
  • 阻塞前会判断 本次阻塞经过了多长时间,如果超过 1ms,就会进入饥饿模式,然后再阻塞
  • 在饥饿模式下,不会进入 自旋,就是一旦有协程释放了锁,就会唤醒等待队列中得协程,同时把 Waiter 数量减一

5.3 Woken 状态

  • woken 用于解锁和加锁之间的通信。
  • 当一个协程加锁失败进入自旋时,会将 Woken 置为1
  • 此时如果正好有一个协程在解锁。就会通知解锁的协程,仅仅解锁就可以了,不必要释放信号量,我自己会通过自旋获取锁