锁的概念

对于某一块代码段,多个线程或者协程同时执行会产生一些不符合预期的结果,就需要使用信号量保护这一段代码区,只能由一个线程来占用和执行这段代码.这相当于是一个大型的原子操作,由软件层面来实现.下面是一段结果不符合预期的代码段:

var count =0
func main()  {
   wg:=sync.WaitGroup{}
   wg.Add(2)
   go add(&wg)
   go add(&wg)
   wg.Wait()
   fmt.Println(count)
}
func add(wg *sync.WaitGroup){
   for i:=0;i<100000;i++{
      count++
   }
   wg.Done()
}

结果大多数情况下,都不是20000,这和预期结果是不一样的.我们来分析一下为什么会和预期不一样:

count++操作编译成汇编指令操作时(cpu只认这种指令),会分成这几步,

1.cpu从内存中获取count值

2.执行count加1操作

3.将count值写到缓存(或者内存中)

每一步都是原子操作(不可中断操作并且在这个时间点会独占某个资源),这三步合在一块就不是原子操作了.可能会出现两个线程(在不同的cpu上)都执行了读取count操作,其中一个线程执行了加一操作并将count值写到内存中更新,另外一个线程对这一操作无感知,并继续操作自己的旧值,结果就和预期不一致了.

单处理器不会出现这种情况,只有多处理器才会出现和预期不符的结果.

这个时候就需要加锁(让count++操作变成软件层面的原子操作)或者使用atomic的add操作(硬件层面的原子操作),同一时间点只让一个cpu不可中断的执行这些指令.

原子操作的概念和实现

概念就是对于某个指令或者某个代码段或者某个资源,在同一时间内,有且仅有一个cpu可以执行,并且是不可中断的,即一旦开始执行就执行到代码段末尾.

硬件层面上,有两种实现方案,目前cpu架构多采用第二种方案.

1.对总线加锁.在x86 平台上,CPU提供了在指令执行期间对总线加锁的手段。对总线加锁以后,别的cpu就不能通过总线访问内存数据了.显而易见,这个加锁指令在性能上有些不尽人意,如果有其他cpu想访问别的资源,也是访问不了的,我认为锁的粒度实在太大,严重影响cpu执行效率.

2.使用缓存一致性协议,对缓存行加锁。当CPU写数据时,如果发现操作的变量是共享变量,即在其他CPU中也存在该变量的副本,会发出信号通知其他CPU将该变量的缓存行置为无效状态,因此当其他CPU需要读取这个变量时,发现自己缓存中缓存该变量的缓存行是无效的,那么它就会从内存重新读取。

软件层面,使用硬件层面的原子操作机制,获取临界区代码段的操作权,然后执行临界区代码段.大多数编程语言锁的实现都是借助于硬件层面实现的加锁机制,使用操作系统(借助硬件加锁)提供的一些基本原子操作(cas,add操作等等)完成对锁的实现,golang锁的实现也不例外.

mutex源码分析

对锁和原子操作有了基本认识以后,再阅读golang提供的锁源码就不费劲了.根据源码进行逐步解释,有些细节本人也没细看,只能说个大概意思.golang版本1.10.3

先讲一下mutex锁的两种模式,正常模式和饥饿模式

正常模式,协程在先进先出的队列里进行排队,前一个协程执行结束解锁时,会唤醒等待队列中的一个协程,唤醒的协程和刚来的协程(还没进入队列)竞争锁的所有权.刚进来的协程当前正在持有着cpu,资源也不需要重新调度,刚唤醒的协程大概率竞争不过新来的协程.当唤醒的协程沉睡超过1ms,会将锁置为饥饿模式.

饥饿模式,解锁的协程将锁的所有权移交给等待队列中的队首协程,新到来的协程不会尝试去获取锁的控制权(即时当前是可获取状态),也不会尝试去自旋,会直接加入到队尾等待被唤醒.

饥饿模式的解除,当前获取mutex所有权的协程是阻塞队列的最后一个协程或者该协程等待时间小于1ms,则将饥饿模式转换成正常模式.

正常模式和饥饿模式优劣,正常模式性能会更高一些,它倾向于新来的协程获取到mutex的所有权,减少了资源切换过程(唤醒老协程需要重新调度资源到cpu中).饥饿模式是防止某个协程等待时间过久,导致预期之外的问题.

// mutex结构体
type Mutex struct {
   state int32 //状态标识符
   sema  uint32
}
const (
   mutexLocked = 1 << iota // 上锁标志位
   mutexWoken //协程有唤醒状态
   mutexStarving //mutex是否处于饥饿模式
   mutexWaiterShift = iota //当前等待mutex的协程数量偏移
)
// lock 获取锁的控制权,对部分和本文内容关联不高的代码做了删减
func (m *Mutex) Lock() {
// 使用cas操作,完成对mutex的抢占,抢占成功则获取到mutex的所有权,失败则去排队等待锁的释放
    if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked)
        return
   }
   var waitStartTime int64 //当前协程等待了多长时间
   starving := false //该协程是否要进入饥饿模式标识
   awoke := false //当前协程是否处于唤醒状态标识
   iter := 0 //自旋次数标识
   old := m.state
   for {
      //饥饿模式下协程不用自旋,新来的协程直接进入队列等待被唤醒,当自旋次数大于4时,也会停止自旋
      if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
         // 自旋协程尝试对mutexWoken标志位上锁,有唤醒协程的情况下,mutex持有协程解锁以后就不需要唤醒排队等待的协程
         if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
            atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
            awoke = true
         }
         runtime_doSpin()//自旋
         iter++
         old = m.state//更新到最新状态
         continue
      }
      new := old
      // 新来的协程在饥饿模式下是不能直接获取到mutex所有权的
      if old&mutexStarving == 0 {
         new |= mutexLocked
      }
     // 当mutex已被占有或者mutex处于饥饿模式,排队者数量+1
      if old&(mutexLocked|mutexStarving) != 0 {
         new += 1 << mutexWaiterShift
     }
      // 当前协程将mutex切换成饥饿模式,如果当前处于解锁状态,则不需要切换成饥饿模式,切换起来没有意义
      if starving && old&mutexLocked != 0 {
         new |= mutexStarving
      }
      if awoke {
         // 当前协程处于唤醒状态有两种可能,1.被解锁的协程唤醒2.刚进来的协程在自旋中成功设置了唤醒标志位
     // 不管怎样,将woken标志位置零
         if new&mutexWoken == 0 {
            throw("sync: inconsistent mutex state")
         }
         new &^= mutexWoken
      }
        //cas操作,当前协程尝试去获取锁
      if atomic.CompareAndSwapInt32(&m.state, old, new) {
         if old&(mutexLocked|mutexStarving) == 0 {
            break //之前的mutex既没有被加锁也不是饥饿模式时,说明锁已经获取成功,当前协程直接返回,执行临界区代码
         }
         //判断之前是否排队等待过mutex
         queueLifo := waitStartTime != 0
         if waitStartTime == 0 {
            waitStartTime = runtime_nanotime()//初始化等待开始时间
         }
         runtime_SemacquireMutex(&m.sema, queueLifo)
         starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs//等待超过1ms,则标记当前已经饥饿,需要设置mutex为饥饿模式
         old = m.state
         if old&mutexStarving != 0 {
            // mutex处于是唤醒状态或者上锁状态,但是等待者数量为零,则报错抛出异常
            if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
               throw("sync: inconsistent mutex state")
            }
            delta := int32(mutexLocked - 1<<mutexWaiterShift)
            if !starving || old>>mutexWaiterShift == 1 {
               // 退出饥饿模式
               delta -= mutexStarving
            }
            atomic.AddInt32(&m.state, delta)
            break
         }
         awoke = true
         iter = 0//自旋次数置为零,进入新的循环,去尝试获取mutex的所有权
      } else {
         old = m.state  //进入新的循环,去尝试获取mutex的所有权
      }
   }
}