Go语言中的锁简单易用,本文整理一下锁的实现原理。
Golang中锁有两种,互斥锁Mutex和读写互斥锁RWMutex,互斥锁也叫读锁,读写锁也叫读锁,相互之间的关系为:
-
写锁需要阻塞写锁:一个协程拥有写锁时,其他协程写锁定需要阻塞
-
写锁需要阻塞读锁:一个协程拥有写锁时,其他协程读锁定需要阻塞
-
读锁需要阻塞写锁:一个协程拥有读锁时,其他协程写锁定需要阻塞
-
读锁不能阻塞读锁:一个协程拥有读锁时,其他协程也可以拥有读锁
1.使用
互斥锁和读写锁在使用上没有很大区别
-
互斥锁使用Lock()进行加锁,使用Unlock()进行解锁
-
读写锁使用RLock()加读锁,使用RUnlock()进行解读锁;使用Lock()加写锁,使用Unlock解写锁,和互斥锁功能一致;
但两者使用场景不同:
-
互斥锁会将操作串行化,可以保证操作完全有序,适合资源只能由一个协程进行操作的情况,并发能力弱;
-
读写锁适合读多写少的情况,并发能有比较强。
package main
import (
"fmt"
"sync"
"time"
)
/**
* @Author: Jason Pang
* @Description: 测试互斥锁
*/
func testMutex() {
count := 0
var l sync.Mutex
for i := 0; i < 10; i++ {
go func() {
l.Lock()
defer l.Unlock()
fmt.Println("---------互斥锁", count)
count++
}()
}
}
/**
* @Author: Jason Pang
* @Description: 测试读写锁
*/
func testRWMutex() {
count := 0
var l sync.RWMutex
for i := 0; i < 10; i++ {
go func() {
l.RLock()
defer l.RUnlock()
fmt.Println("---------读写互斥锁", count)
count++
}()
}
}
func main() {
testMutex()
testRWMutex()
time.Sleep(10 * time.Second)
}
输出:
可以看出使用互斥锁后,count值是顺序增加的,而使用读写互斥锁,数据则是无序的。
2.基础知识
讲锁的具体实现原理之前,需要先复习几个基础知识:进程同步、信号量和自旋。
2.1进程同步
进程同步本质上是靠控制对临界区的访问权限实现的。
-
临界资源:把在一段时间内只允许一个进程访问的资源称为临界资源或独占资源。计算机系统中的大多数物理设备,以及某些软件中所用的栈、变量和表格,都属于临界资源, 它们要求被互斥地共享
-
临界区:在每个进程中访问临界资源的那段代码称为临界区(critical section)。若能保证诸进程互斥地进入自己的临界区,便可实现诸进程对临界资源的互斥访问。
-
进入区(entry section):如果此刻该临界资源未被访问,进程便可进入临界区对该资源进行访问,并设置它正被访问的标志;如果此刻该临界资源正被某进程访问,则本进程不能进入临界区。
-
退出区(exit section):用于将临界区正被访问的标志恢复为未被访问的标志。
-
同步机制规则
(1) 空闲让进。当无进程处于临界区时,表明临界资源处于空闲状态,应允许一个请求进入临界区的进程立即进入自己的临界区,以有效地利用临界资源。
(2) 忙则等待。当已有进程进入临界区时,表明临界资源正在被访问,因而其它试图进 入临界区的进程必须等待,以保证对临界资源的互斥访问。
(3) 有限等待。对要求访问临界资源的进程,应保证在有限时间内能进入自己的临界区, 以免陷入“死等”状态。
(4) 让权等待。当进程不能进入自己的临界区时,应立即释放处理机,以免进程陷入“忙等”状态。
这个规则和现实一致:如果有空闲我就可以用吧(空闲让进);如果不空闲,为了有序我可以等待(忙则等待);我等待的时候没别的事情可以做,那可以去一边休息吧(让权等待);你们不能让我老等着吧(有限等待);
2.2信号量
1965 年,荷兰学者 Dijkstra 提出的信号量(Semaphores)机制是一种卓有成效的进程同步工具。Dijkstra,YYDS。
2.2.1类型
信号量现在已发展为如下四种类型:
-
整型信号量
-
记录型信号量
-
AND型信号量
-
信号量集
虽然信号量有不同类型,但核心是对:一个用于表示资源数目的整型量 S,仅能通过两个标准的原子操作(Atomic Operation) wait(S)和 signal(S)来访问。wait用于将S值变小,signal用于将S值增加,伪代码如下:
wait(S):while S<=0 do no-op;
S:=S-1;
signal(S):S:=S+1;
2.2.2应用
-
利用信号量实现进程互斥
为使多个进程能互斥地访问某临界资源,只须为该资源设置一互斥信号量 mutex,并设其初始值为 1,然后将各进程访问该资源的临界区(critical section)置于 wait(mutex)和 signal(mutex)操作之间即可。
-
利用信号量实现前驱关系
设有两个并发执行的进程 P1 和 P2。P1 中有语句 S1;P2 中有语句 S2。我们希望在 S1 执行后再执行 S2。为实现这种前趋关系,我们只须使进程 P1 和 P2 共享一个公用信号量 S,并赋予其初值为 0,将 signal(S)操作放在语句 S1 后面;而在 S2 语句前面插入 wait(S)操作,即
在进程 P1 中,用 S1;signal(S);
在进程 P2 中,用 wait(S);S2;
2.3自旋
同步机制规则里有让权等待,自旋其实就是说在进程不能进入自己的临界区时是如何处理的。
2.3.1定义
加锁时,如果发现该锁当前由其他协程持有,尝试加锁的协程并不是马上转入阻塞,而是会持续的探测锁是否被释放,这个过程即为自旋过程。自旋时间很短,但如果在自旋过程中发现锁已被释放,那么协程可以立即获取锁。
2.3.2过程
自旋过程为先检查是否可以加锁,如果不可以,执行CPU PAUSE指令,CPU对该指令什么都不做,一般为30个时钟周期。PAUSE执行后,再检查是否可以加锁,循环往复。
在这个过程中,进程仍然是执行状态,不是睡眠状态。
2.3.3优势
**自旋主要为了更加高效,减少损耗。**自旋的优势是更充分的利用CPU,尽量避免协程切换。因为当前申请加锁的协程拥有CPU,如果经过短时间的自旋可以获得锁,当前协程可以继续运行,不必进入阻塞状态。
2.3.4条件
自旋不能随便使用,否则不但发挥不了优势,还会带来更多损耗,举个简单的例子:如果自旋次数不限制,而获得锁的进程很长时间后才释放锁,则自旋的进程这段时间CPU完全浪费了。
所以使用自旋,一定要满足一下条件:
-
自旋次数要足够小,通常为4,即自旋最多4次
-
CPU核数要大于1,否则自旋没有意义,因为此时不可能有其他协程释放锁
-
调度机制中的Process数量要大于1,否则自旋没有意义
-
调度机制中的可运行队列必须为空,否则会延迟协程调度,需要把CPU让给更需要的进程
2.3.5问题
自旋有个特性,无视正在排队等待加锁的进程,在自旋过程中,获取到锁便可加锁,类似于插队。
所以使用自旋会引发问题:极端情况下,很多进程正排队等待加锁,此时有进程刚到,开始自旋加锁,如果成功,该进程便插队成功加锁。如果此时不断有进程自旋加锁,则在排队的进程将长时间无法获取到锁。
一般解决方案为:锁添加饥饿状态,该状态下不允许自旋。
3.实现原理
3.1互斥锁Mutex
3.1.1 结构体
Mutex结构体如下:
// A Mutex must not be copied after first use.
// Mutex被使用后,不可以将其复制。(意思是不能复制值,可以做成引用复制)
// 复制容易导致非预期的死锁,https://mozillazg.com/2019/04/notes-about-go-lock-mutex.html#hidcopy
type Mutex struct {
state int32
sema uint32
}
-
state表示互斥锁的状态,比如是否被锁定等。
-
sema表示信号量,协程阻塞等待该信号量,解锁的协程释放信号量从而唤醒等待信号量的协程。
state是32位的整型变量,内部实现时把该变量分成四份,用于记录Mutex的四种状态。
const (
mutexLocked = 1 << iota //值1
mutexWoken //值2
mutexStarving //值4
)
Locked: 表示该Mutex是否已被锁定,0:没有锁定 1:已被锁定。
Woken: 表示是否有协程已被唤醒,0:没有协程唤醒 1:已有协程唤醒,正在加锁过程中。释放锁时,如果正常模式下,不会再唤醒其它协程。
Starving:表示该Mutex是否处理饥饿状态, 0:没有饥饿 1:饥饿状态,说明有协程阻塞了超过1ms。
Waiter: 表示阻塞等待锁的协程个数,协程解锁时根据此值来判断是否需要释放信号量
协程之间抢锁实际上是抢给Locked赋值的权利,能给Locked域置1,就说明抢锁成功。抢不到的话就阻塞等待
Mutex.sema信号量,一旦持有锁的协程解锁,等待的协程会依次被唤醒。
3.1.2 互斥公平
go1.13中,讲述starving、woken、locked是如何使用的,对原文进行翻译
// Mutex fairness.
//
// Mutex can be in 2 modes of operations: normal and starvation.
// In normal mode waiters are queued in FIFO order, but a woken up waiter
// does not own the mutex and competes with new arriving goroutines over
// the ownership. New arriving goroutines have an advantage -- they are
// already running on CPU and there can be lots of them, so a woken up
// waiter has good chances of losing. In such case it is queued at front
// of the wait queue. If a waiter fails to acquire the mutex for more than 1ms,
// it switches mutex to the starvation mode.
//
// In starvation mode ownership of the mutex is directly handed off from
// the unlocking goroutine to the waiter at the front of the queue.
// New arriving goroutines don't try to acquire the mutex even if it appears
// to be unlocked, and don't try to spin. Instead they queue themselves at
// the tail of the wait queue.
//
// If a waiter receives ownership of the mutex and sees that either
// (1) it is the last waiter in the queue, or (2) it waited for less than 1 ms,
// it switches mutex back to normal operation mode.
//
// Normal mode has considerably better performance as a goroutine can acquire
// a mutex several times in a row even if there are blocked waiters.
// Starvation mode is important to prevent pathological cases of tail latency.
互斥量有两种模式:正常模式和饥饿模式。
正常模式:正常模式下等待的协程按照先入先出排列,当一个协程被唤醒后并不是直接拥有锁,该协程需要和刚刚到达的协程一起竞争锁的所有权。新到的协程有个优势,那就是它已经在CPU上运行了,而且新到的协程可能有很多,所以被唤醒的协程极有可能抢占不到锁。在这种情况下,被唤醒的协程会被放置于等待队列的队头。如果等待的协程超过1ms内没有获取到锁,将会把锁置为饥饿模式。
饥饿模式:在饥饿模式下,解锁的协程会将锁的所有权直接交给等待队列中位于队头的协程。正好解锁的那一刻有新的协程到达,新到达的协程也不会尝试自旋获取锁。相反,他们会将自己置于等待队列的队尾。
如果等待队列中的协程获取到锁,它会查看
(1)自己是否是等待队列中最后一个协程
(2)自己等待的时间是否小于1ms
如果有任意一个条件满足,将会将锁改为普通模式。
一般认为普通模式会有更好的性能,因为即使有等待的协程,新的协程可以连续获取到锁。饥饿模式能够防止等待协程长时间获取不到锁。
3.1.3 Lock
// Lock locks m.
// If the lock is already in use, the calling goroutine
// blocks until the mutex is available.
// 如果锁已经被占用了,则将调用Lock的协程阻塞,知道锁被释放
func (m *Mutex) Lock() {
// Fast path: grab unlocked mutex.
// 如果锁即没被占用、也不是饥饿状态、也没有唤醒协程、也没有等待的协程,直接加锁成功
// 这是比较完美的一种状态
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
if race.Enabled { //默认是false,所以可以不用管
race.Acquire(unsafe.Pointer(m))
}
return
}
// Slow path (outlined so that the fast path can be inlined)
m.lockSlow()
}
func (m *Mutex) lockSlow() {
var waitStartTime int64
starving := false
awoke := false
iter := 0
old := m.state
for {
// Don't spin in starvation mode, ownership is handed off to waiters
// so we won't be able to acquire the mutex anyway.
// 如果是正常模式且锁被抢占了,自己符合自旋条件,就自旋
// 因为按照规定,饥饿模式下需要保证等待队列中的协程能够获得锁的所有权,防止等待队列饿死
// 如果锁变为饥饿状态或者已经解锁了,或者不符合自旋条件了就结束自旋
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
// Active spinning makes sense.
// Try to set mutexWoken flag to inform Unlock
// to not wake other blocked goroutines.
// 如果等待队列有协程、锁没有设置唤醒状态,就努力设置唤醒状态
// 这么做的好处是,当锁解锁的时候,不会去唤醒已经阻塞的协程,保证自己更大概率获取到锁
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
runtime_doSpin()
iter++
old = m.state
continue
}
// 此处说明锁变为饥饿状态或者已经解锁了,或者不符合自旋条件了(仍为锁定状态)
// 锁状态包含-饥饿锁定、饥饿未锁定、正常锁定、正常未锁定
// 获取锁最新的状态
new := old
// Don't try to acquire starving mutex, new arriving goroutines must queue.
// 如果是正常状态,尝试加锁。饥饿状态下要出让竞争权利,肯定不能加锁的
if old&mutexStarving == 0 {
new |= mutexLocked
}
// 如果锁还是被占用的或者锁是饥饿状态,只能将自己放到等待队列上
// 到了这个阶段,遇到这些状态,协程只能躺平。饥饿状态要出让竞争权利
if old&(mutexLocked|mutexStarving) != 0 {
new += 1 << mutexWaiterShift
}
// The current goroutine switches mutex to starvation mode.
// But if the mutex is currently unlocked, don't do the switch.
// Unlock expects that starving mutex has waiters, which will not
// be true in this case.
// 如果自身已经到饥饿状态了,而且锁是被占用情况下,将锁改为饥饿状态
// 锁未被占用不能改为饥饿模式,是因为如果锁没有被占用,但是锁是饥饿状态,那应该有等待队列。
// 如果锁未被占用却改为饥饿状态,违背了这个条件。(不是很明白这句话)
if starving && old&mutexLocked != 0 {
new |= mutexStarving
}
// 如果该协程设置锁的唤醒状态,需要将唤醒状态进行重置。
// 因为改协程要么获得了锁、要么进入休眠,都和唤醒状态无关了
if awoke {
// The goroutine has been woken from sleep,
// so we need to reset the flag in either case.
if new&mutexWoken == 0 {
throw("sync: inconsistent mutex state")
}
new &^= mutexWoken
}
// old -> new
// (0,1)正常且已锁定 -> (+1,1?,1) 等待加一,状态待定,加锁 -> 加到等待队列
// (0,0)正常且未锁定 -> (+0,0 ,1) 等待不变,正常状态,加锁 -> 加锁成功
// (1,1)饥饿且已锁定 -> (+1,1?,1) 等待加一,饥饿待定,加锁 -> 加到等待队列
// (1,0)饥饿且未锁定 -> (+1,1 ,0) 等待加一,饥饿状态,不加锁 -> 加到等待队列
if atomic.CompareAndSwapInt32(&m.state, old, new) {//如果CAS成功
//如果锁为未锁定且正常状态,表明占有锁成功,加锁操作完毕
if old&(mutexLocked|mutexStarving) == 0 {
break // locked the mutex with CAS
}
// If we were already waiting before, queue at the front of the queue.
queueLifo := waitStartTime != 0
if waitStartTime == 0 {
waitStartTime = runtime_nanotime()
}
// 走到此处,说明协程没有获取到锁,调用runtime_SemacquireMutex,将该协程挂起
// waitStartTime能够判断该协程是新来的还是被唤醒的
// 如果是新来的,则加入队列尾部,等待唤醒,queueLifo=false
// 如果是从等待队列中唤醒的,则加入队列头部,queueLifo=true
// 如果后面该协程被唤醒,就从该位置继续往下执行
runtime_SemacquireMutex(&m.sema, queueLifo, 1)
// 此刻说明该协程被唤醒了
// 判断该协程是否长时间没有获取到锁,如果是的话,就是饥饿的协程
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
// 协程被挂起的时间有点长,需要重新获取一下当前锁的状态
old = m.state
// 表示当前是饥饿状态的情况。按照设定,饥饿状态下,被唤醒的协程直接获得锁。
if old&mutexStarving != 0 {
// If this goroutine was woken and mutex is in starvation mode,
// ownership was handed off to us but mutex is in somewhat
// inconsistent state: mutexLocked is not set and we are still
// accounted as waiter. Fix that.
// 饥饿状态下,我被唤醒,结果发现锁没释放、唤醒值是1、等待列表没有协程了(不把我算作协程了)
// 不符合设定,果断有问题啊
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
throw("sync: inconsistent mutex state")
}
// 值是7,因为此时锁状态为未锁定,使用7可以达到等待数量减一,同时将锁设置为锁定的效果
delta := int32(mutexLocked - 1<<mutexWaiterShift)
// 如果唤醒等待队列的协程不饥饿、或者这个协程是等待队列中最后一个协程,就改为正常状态
if !starving || old>>mutexWaiterShift == 1 {
// Exit starvation mode.
// Critical to do it here and consider wait time.
// Starvation mode is so inefficient, that two goroutines
// can go lock-step infinitely once they switch mutex
// to starvation mode.
delta -= mutexStarving
}
// 将锁状态设置为等待数量减一,同时设置为锁定。加锁成功
// 这个计算方法太秀了
atomic.AddInt32(&m.state, delta)
break
}
// 本协程千真万确就是被系统唤醒的协程
awoke = true
// 自旋次数重置为0
iter = 0
} else { //如果CAS失败,则重新开始
old = m.state
}
}
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
}
3.1.3 UnLock
// Unlock unlocks m.
// It is a run-time error if m is not locked on entry to Unlock.
// 如果在解锁的时候,锁是没有被锁定的,则报运行时错误。
//
// A locked Mutex is not associated with a particular goroutine.
// It is allowed for one goroutine to lock a Mutex and then
// arrange for another goroutine to unlock it.
// 加锁和解锁可以不是同一个协程
func (m *Mutex) Unlock() {
if race.Enabled { //默认是false
_ = m.state
race.Release(unsafe.Pointer(m))
}
// Fast path: drop lock bit.
// 不是饥饿状态,没有等待的协程、没有唤醒,直接解锁完毕
new := atomic.AddInt32(&m.state, -mutexLocked)
// 说明可能为饥饿状态、有等待协程、有唤醒的协程,事情没处理完,还得继续处理
if new != 0 {
// Outlined slow path to allow inlining the fast path.
// To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock.
m.unlockSlow(new)
}
}
func (m *Mutex) unlockSlow(new int32) {
if (new+mutexLocked)&mutexLocked == 0 {
throw("sync: unlock of unlocked mutex")
}
//如果是正常模式
if new&mutexStarving == 0 {
old := new
for {
// If there are no waiters or a goroutine has already
// been woken or grabbed the lock, no need to wake anyone.
// In starvation mode ownership is directly handed off from unlocking
// goroutine to the next waiter. We are not part of this chain,
// since we did not observe mutexStarving when we unlocked the mutex above.
// So get off the way.
// 如果等待列表里没有协程了,或者已经有唤醒的协程了,就无需浪费精力唤醒其它协程了
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
// Grab the right to wake someone.
// 等待协程数量减1,并将锁的唤醒位置为1
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
runtime_Semrelease(&m.sema, false, 1)
return
}
old = m.state
}
} else {//如果是饥饿模式
// Starving mode: handoff mutex ownership to the next waiter.
// Note: mutexLocked is not set, the waiter will set it after wakeup.
// But mutex is still considered locked if mutexStarving is set,
// so new coming goroutines won't acquire it.
// 饥饿模式下,直接将锁的所有权交给等待队列中的第一个
// 注意:锁的locked位没有被设置,唤醒的协程后面会进行设置
// 尽管没有设置locked位,但是在饥饿模式下,新来的协程也是无法获取到锁的。
runtime_Semrelease(&m.sema, true, 1)
}
}
3.1.4 函数说明
【runtime_canSpin】:在 src/runtime/proc.go 中被实现 sync_runtime_canSpin;表示是否可以保守的自旋,golang中自旋锁并不会一直自旋下去,在runtime包中runtime_canSpin方法做了一些限制, 传递过来的iter大等于4或者cpu核数小等于1,最大逻辑处理器大于1,至少有个本地的P队列,并且本地的P队列可运行G队列为空。
【runtime_doSpin】:在 src/runtime/proc.go 中被实现 sync_runtime_doSpin;表示 会调用procyield函数, 该函数也是汇编语言实现。函数内部循环调用PAUSE指令。PAUSE指令什么都不做, 但是会消耗CPU时间,在执行PAUSE指令时,CPU不会对它做不必要的优化。
【runtime_SemacquireMutex】:在 src/runtime/sema.go 中被实现 sync_runtime_SemacquireMutex;表示通过信号量阻塞当前协程 。
【runtime_Semrelease】: 在src/runtime/sema.go 中被实现 sync_runtime_Semrelease。表示通过信号量解除当前协程阻塞。
3.1.5 流程图
https://www.processon.com/view/link/60f4e1021e085376da5c05f8
Go互斥锁实现逻辑很复杂,能够看到大量的性能优化方面的代码,所以导致整个逻辑很难理解。大家即使看了注释和流程图,理解起来应该还是会有些困难。我的建议是,一是搞懂lock、woken、starving所代表的功能,二是不是要1.13版本的锁实现,可以看早期实现,会更加简单一些。
4.总结
本来以为写锁的实现会很快能完成,结果看这一两百行代码用了差不多一个星期。个人也不太建议看1.13锁的具体实现,太过于繁杂了。可以看一下https://www.cnblogs.com/niniwzw/p/3153955.html,讲了锁的演变。
更容易让大家理解的方式是使用状态机,将加锁和写锁操作都放入状态机中,但锁状态分四部分,加上各种操作,绘制起来比较耗时,如果大家有兴趣可以绘制一下。
写这篇文章的时候,有些资料查的大学教程《计算机操作系统》,发觉这些书真是好书,不但准确而且易懂,以前都是死记硬背,现在感觉简直是宝书。
读写锁另起一篇文章写吧,这篇已经太长了,写不动了。
资料
-
线程同步(互斥锁与信号量的作用与区别)
-
信号量及其使用和实现(超详细)
-
Go专家编程
-
Golang同步机制的实现
-
【我的架构师之路】- 说一说go中的sync包
-
Golang 互斥锁内部实现
-
go sync.Mutex 设计思想与演化过程 (一)
-
一文读懂go中semaphore(信号量)源码
-
Go: 关于锁(mutex)的一些使用注意事项