golang 的metux 的实现有几个点做法是非常有意思的,一个是底层数据结构上,用了平时很少用的位运算,第二个,用到了自旋,并做了自旋策略控制,最后是用了信号量控制协程。
首先是golang mutex 中用了很多位运算。位运算不做细介绍,对内存利用比较高的算法都有涉及,比如redies 的压缩列表,比如golang 的Protobuffer。
有几个关键点,iota 在定义的时候,用的多,做自增运算:
const (
mutexLocked = 1 << iota // mutex is locked
mutexWoken
mutexWaiterShift = iota
)
// 这里 第一个变量为1 ,第二个变量为10, 第三个为10
然后,位运算的求或和求与用的很多,一个是与1 求或将某位置1,一个是与0 求与将某位置0,这些都是用于改变某些标志位的方式,不要看懵逼了:
new := old | mutexLocked // 将old 的最后一位置1,表示new 锁一定是被持有状态
if old&mutexLocked != 0 { // 将最后一位保留后,其他位全部置0, 判断最后一位的状态是不是0,判断是不是被持有
if runtime_canSpin(iter) {
// Active spinning makes sense.
// Try to set mutexWoken flag to inform Unlock
// to not wake other blocked goroutines.
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
runtime_doSpin()
iter++
continue
}
new = old + 1<<mutexWaiterShift
}
第二个是golang 的加锁会自旋,4次自旋没拿到锁后再将协程休眠,这样可以减少切换成本。这里关判断条件是自旋次数,cpu核数,p 的数量:
func sync_runtime_canSpin(i int) bool {
if i >= active_spin || ncpu <= 1 || gomaxprocs <= int32(sched.npidle+sched.nmspinning)+1 {
return false
}
if p := getg().m.p.ptr(); !runqempty(p) {
return false
}
return true
}
最后是利用信号量挂起和唤醒协程,核心函数是
runtime_SemacquireMutex(&m.sema)
runtime_Semrelease(&m.sema)
获取信号时,当s > 0 ,将s--,如果s 为负数,会将当前g 放入阻塞队列,挂起直到s>0。
func (m *Mutex) Lock() {
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return // cas 获取到锁,直接返回
}
awoke := false //循环标记
iter := 0 //循环计数器
for {
old := m.state //保存当前锁状态
new := old | mutexLocked //将状态位最后一位指定1
if old&mutexLocked != 0 { //锁被占用
if runtime_canSpin(iter) { //检查是否可以进入自旋锁,4次
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
//awoke标记为true
awoke = true
}
runtime_doSpin()//进入自旋
iter++
continue
}
new = old + 1<<mutexWaiterShift //锁被占用,且自旋次数超过4次,挂起协程数+1,下面步骤将g 挂起并等待
}
if awoke {
if new&mutexWoken == 0 {
throw("sync: inconsistent mutex state")
}
new &^= mutexWoken //清除标志
}
if atomic.CompareAndSwapInt32(&m.state, old, new) { //更新协程计数
if old&mutexLocked == 0 {
break
}
// 锁请求失败,进入休眠状态,等待信号唤醒后重新开始循环,一直阻塞在这里
runtime_SemacquireMutex(&m.sema)
awoke = true
iter = 0
}
}
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
}
解锁的过程就和加锁反过来即可:
func (m *Mutex) Unlock() {
if race.Enabled {
_ = m.state
race.Release(unsafe.Pointer(m))
}
new := atomic.AddInt32(&m.state, -mutexLocked)// 移除加锁位
if (new+mutexLocked)&mutexLocked == 0 {
throw("sync: unlock of unlocked mutex")
}
old := new
for {
//当休眠队列内的等待计数为0或者自旋状态计数器为0,退出
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken) != 0 {
return
}
// 等待协程数-1,更改清除标记位
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
runtime_Semrelease(&m.sema)// 释放锁,发送释放信号,对应之前的acquire
return
}
old = m.state
}
}