目前golang的版本是1.12,其中的mutex是增加了普通模式和饥饿模式切换的优化版本,为了便于理解,这里先从上一个版本1.7版本的mutex开始分析,以后再对优化版本进行说明。

1.7版本Mutex实现

Mutex定义

// mutex是互斥锁
// mutex的零值是没有加锁的
//在使用之后不能被拷贝
type Mutex struct {
    state int32  //状态标识
    sema  uint32 //信号量
}

const (
    mutexLocked = 1 << iota // mutex is locked
    mutexWoken
    mutexWaiterShift = iota
)

  其中state是记录用来记录加锁状态的,将一个整型按位划分来表示不同的含义,从低到高分别为第1位到第32位,

  • 第1位表示是否被锁住
  • 第2表示是否被唤醒
  • 第3位到第32位表示等待在mutex上协程数量
      其中sema是信号量,是一个非负数的全局变量,下面对信号量进行简单说明。

信号量

  信号量是进程间通信处理同步互斥的机制,通过一个计数器来控制对共享资源的访问次数限制。例如一个办公室有两台打印机,有几十台电脑连上,这是同时只能允许两个电脑进行打印,而其他电脑必须排队等待完成后才能打印。

  sema就是信号量,是一个非负数的全局变量,该变量有两个操作P和V,PV操作都是不可中断的。

P(S)

(1)执行S=S-1;
(2)进行以下判断:

  • 如果S < 0,进入阻塞队列,直到满足S>=0,如果S >= 0, 直接返回
  • 因此P操作执行一次意味着分配一个资源,如上打印机意味着是资源,当S小于0意味着没有可用资源了,只能一直等待,直到资源空闲出来时才能继续。

V(S)

(1)执行S=S+1;
(2)进行以下判断:

  • 如果S > 0,直接返回 如果S <= 0, 释放阻塞队列中的第一个等待进程
  • 因此V操作执行一次意味着释放一个资源,当S小于等于0时,意味着还有进程在请求资源,此时释放了一个资源,就需要从等待队列中拿出一个进程来使用此刻释放的资源。

golang中信号量操作

runtime_Semacquire

  func runtime_Semacquire(s uint32),P操作,等待s大于等于0,源码在runtime/sema.go中

runtime_Semrelease

  func runtime_Semrelease, V操作,阻塞等待被唤醒,目前版本在runtime/sema.go中(定义稍有不同了)。

直接使用信号量实现互斥锁

  考虑下,如果直接用信号量来实现互斥,即新建一个sema=1,然后用PV操作runtime_Semacquire和runtime_Semrelease来实现,也可以做到当一次请求时,拿到资源进行执行,后续请求阻塞,进入等待队列,不考虑性能,按照这样简单的思路实现如下:

type Mutex struct {
    sema uint32
}

func NewMutex() *Mutex {
    var mu Mutex
    mu.sema = 1 // 初始设置sema=1
    return &mu
}

func (m *Mutex) Lock() {
        runtime_Semacquire(&m.sema)
}

func (m *Mutex2) Unlock() {
    runtime_Semrelease(&m.sema)
}

问题

  当加锁一次,代码中解锁了两次, 会导致sema值变化而不提示任何错误,即这时sema=2,资源数量发生了变化,导致后续运行异常,所以多次解锁时需要返回异常。

通过加锁次数避免重复解锁

type Mutex struct {
        key  int32
        sema uint32
}

func (m *Mutex) Lock() {
        if atomic.AddInt32(&m.key, 1) == 1 {
                // changed from 0 to 1; we hold lock
                return
        }
        runtime_Semacquire(&m.sema)
}

func (m *Mutex) Unlock() {
        switch v := atomic.AddInt32(&m.key, -1); {
        case v == 0:
                // changed from 1 to 0; no contention
                return
        case v == -1:
                // changed from 0 to -1: wasn't locked
                // (or there are 4 billion goroutines waiting)
                panic("sync: unlock of unlocked mutex")
        }
        runtime_Semrelease(&m.sema)
}

  这个解决方案除了解决了我们前面说的重复加锁的问题外,还对我们初始化工作做了简化,不需要构造函数了。

  • 初始:key=0, sema = 0

  • Lock第一次:key+1=1返回,sema=0,即第一次不进行P操作,直接将key加1表示获取了锁。

  • Lock第二次:key=2,进行P操作,发现sema-1 =-1<0,阻塞等待获取锁。
      当执行了一次Lock后,key=1,sema=0,执行以下操作时:

  • Unlock第一次:key-1=0返回,sema=0,第一次解锁不执行V操作,直接key减1表示释放锁。

  • Unlock第二次:key-1=-1,表示解锁过了,返回异常。
      当执行了两次Lock后,key=2,sema=-1,执行以下操作时:

  • Unlock第一次:key-1=1,执行V操作runtime_Semrelease,发现sema+1=0,会阻塞直到唤醒了其他协程,然后返回。
      简单来说,增加一个key变量后,sema=0表示有一个资源,跟只用信号量时sema=1含义一样,在golang mutex也是基于此实现的。

Mutex操作解读(最新版)

Lock

func (m *Mutex) Lock() {
-----------------代码块1 start-----------------
    // Fast path: grab unlocked mutex.  
    if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
        if race.Enabled {
            race.Acquire(unsafe.Pointer(m))
        }
        return
    }
-----------------代码块1 end----------------- 
    awoke := false
    iter := 0
    for {
  -----------------代码块2 start-----------------  
        old := m.state
        new := old | mutexLocked
        if old&mutexLocked != 0 {
            if runtime_canSpin(iter) {
                // Active spinning makes sense.
                // Try to set mutexWoken flag to inform Unlock
                // to not wake other blocked goroutines.
                if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
                    atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
                    awoke = true
                }
                runtime_doSpin()
                iter++
                continue
            }
            new = old + 1<<mutexWaiterShift
        }
    -----------------代码块2 end-----------------
    -----------------代码块3 start-----------------
        if awoke {
            // The goroutine has been woken from sleep,
            // so we need to reset the flag in either case.
            if new&mutexWoken == 0 {
                panic("sync: inconsistent mutex state")
            }
            new &^= mutexWoken
        }
-----------------代码块3 end-----------------
-----------------代码块4 start-----------------
        if atomic.CompareAndSwapInt32(&m.state, old, new) {
            if old&mutexLocked == 0 {
                break
            }
            runtime_Semacquire(&m.sema)
            awoke = true
            iter = 0
        }
-----------------代码块4 start-----------------
    }
 
    if race.Enabled {
        race.Acquire(unsafe.Pointer(m))
    }
}

  将上面代码标注为4块,下面依次进行分析。对代码逻辑进行详细分析之前,先介绍下其中用到部分函数。

关键函数

race.Acquire

		if race.Enabled {
            race.Acquire(unsafe.Pointer(m))
        }

  竞争检测逻辑。go中使用goroutine比较常见,在大型项目中可能会在多个goroutine中用到某个全局变量,如果有竞争就需要加锁操作。

runtime_canSpin

//go:linkname sync_runtime_canSpin sync.runtime_canSpin
func sync_runtime_canSpin(i int) bool {
 if i >= active_spin || ncpu <= 1 || gomaxprocs <= int32(sched.npidle+sched.nmspinning)+1 {
 return false
 }
 if p := getg().m.p.ptr(); !runqempty(p) {
 return false
 }
 return true
}

  判断是否需要自选,golang中自旋锁并不会一直自旋下去,在runtime包中runtime_canSpin方法做了一些限制, 传递过来的iter大等于4或者cpu核数小等于1,最大逻辑处理器大于1,至少有个本地的P队列,并且本地的P队列可运行G队列为空才会进行自旋。

runtime_doSpin

//go:linkname sync_runtime_doSpin sync.runtime_doSpin
func sync_runtime_doSpin() {
 procyield(active_spin_cnt)
}

  进行自旋操作,会调用procyield函数,该函数也是汇编语言实现。函数内部循环调用PAUSE指令。PAUSE指令什么都不做,但是会消耗CPU时间,在执行PAUSE指令时,CPU不会对它做不必要的优化。

代码块1

    if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
        if race.Enabled {
            race.Acquire(unsafe.Pointer(m))
        }
        return
    }

  如果state=0,即没有锁住、没有唤醒且没有等待队列,可直接拿到锁,将状态置为锁住并返回,这相当于是上面demo版中从key=0,sema=0的状态,变为key=1,seme=0的状态。

代码块2

		//最新状态
		old := m.state  
        new := old | mutexLocked
        //已经被锁住
        if old&mutexLocked != 0 {
        	//判断是否需要自选,这是在for循环中,iter次数可能已经超过不需要自旋了,或者其他条件
            if runtime_canSpin(iter) {
                // 主动自旋是有意义的,因为会尝试唤醒锁,
                //这样上个协程此时unlock的话,就不会唤醒其他协程
                if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
                    atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
                    //自己没有唤醒,且原状态没有唤醒,
                    //且有协程在排队且设置唤醒标识成功,
                    //说明上个的协程此时unlock了,
                    awoke = true
                }
                //自旋一段时间
                runtime_doSpin()
                iter++
                continue
            }
            //不需要自旋,将state的等待队列数据加1
            new = old + 1<<mutexWaiterShift
        }

代码块3

        if awoke {
            //代码块1中将awoke置为1了,标识被唤醒
            //代码块1中只有设置了唤醒标识,awoke才会为true,因此不会new&mutexWoken == 0
            if new&mutexWoken == 0 {
                panic("sync: inconsistent mutex state")
            }
            //既然当前协程被唤醒了,需要将state置为未唤醒
            new &^= mutexWoken
        }

代码块4

        //这里new有四种值
        if atomic.CompareAndSwapInt32(&m.state, old, new) {
            if old&mutexLocked == 0 {
                //没有锁住,直接返回
                break 
            }
            //当前锁住的,阻塞在此处等待,会让出cpu
            runtime_Semacquire(&m.sema)
            //从阻塞中返回,设置当前协程被唤醒了
            awoke = true
            iter = 0
        }

Unlock

func (m *Mutex) Unlock() {
	//race检测
    if race.Enabled {
        _ = m.state
        race.Release(unsafe.Pointer(m))
    }
 
    // 判断是否多次解锁,多次解锁则抛出异常
    new := atomic.AddInt32(&m.state, -mutexLocked)
    if (new+mutexLocked)&mutexLocked == 0 {
        panic("sync: unlock of unlocked mutex")
    }
 
    old := new
    for {
        if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken) != 0 {
            return
        }
        new = (old - 1<<mutexWaiterShift) | mutexWoken
        if atomic.CompareAndSwapInt32(&m.state, old, new) {
            runtime_Semrelease(&m.sema)
            return
        }
        old = m.state
    }
}

  for循环中,因为old在更新,第一个if语句会在以下两种情况时返回:

  • 当前锁上没有协程在等待
  • 当前锁已经被其他协程lock了或者唤醒了
      第二个if语句,cas原子操作将等待协程数目减1,并设置唤醒标识,阻塞在runtime_Semrelease处,直到有其他协程被唤醒才返回。
      看到这里,就可以知道,唤醒操作有两种:
    (1)lock函数,执行自旋过程中主动唤醒自己,会执行到awoke = true相关代码;
    (2)unlock函数,原协程设置唤醒标识,本协程被动唤醒,不会执行awoke = true相关代码。