1. 前言

        Golang中有两种类型的锁,Mutex (互斥锁)和RWMutex(读写锁)对于这两种锁的使用这里就不多说了,本文主要侧重于从源码的角度分析这两种锁的具体实现。

2. 引子问题

        我一般喜欢带着问题去看源码。那么对于读写锁,你是否有这样的问题,为什么可以有多个读锁?有没有可能出现有协程一直无法  获取到写锁的情况?带着你的疑问来往下看看,具体这个锁是如何实现的。

        如果你自己想看,我给出阅读的一个思路,可以先看读写锁,因为读写锁的实现依赖于互斥锁,并且读写锁比较简单一些,然后整理思路之后再去想一下实际的应用场景,然后再去看互斥锁。

        下面我就会按照这个思路一步步往下走。

3. 基础知识点

  • 知识点1:信号量
    • 信号量是 Edsger Dijkstra 发明的数据结构(没错就是那个最短路径算法那个牛人),在解决多种同步问题时很有用。其本质是一个整数,并关联两个操作:
      • 申请acquire(也称为 wait、decrement 或 P 操作)
        • acquire操作将信号量减 1,如果结果值为负则线程阻塞,且直到其他线程进行了信号量累加为正数才能恢复。如结果为正数,线程则继续执行。
      • 释放release(也称 signal、increment 或 V 操作)
        • release操作将信号量加 1,如存在被阻塞的线程,此时他们中的一个线程将解除阻塞。
  • 知识点2:锁的定义
    • 在goalng中如果实现了Lock和Unlock方法,那么它就可以被称为锁。

  • 知识点3:锁的自旋

  • 知识点4:cas算法:(最好有所了解,不知道问题也不大)

    • cas算法

      • 一个线程失败或挂起并不会导致其他线程也失败或挂起,那么这种算法就被称为非阻塞算法。而CAS就是一种非阻塞算法实现,也是一种乐观锁技术,它能在不使用锁的情况下实现多线程安全,所以CAS也是一种无锁算法。

      • CAS  比较并交换,是一种实现并发算法时常用到的技术,Java并发包中的很多类都使用了CAS技术。CAS具体包括三个参数:当前内存值V、旧的预期值A即将更新的值B,当且仅当预期值A和内存值V相同时,将内存值修改为B并返回true,否则什么都不做,并返回false。CAS 有效地说明了“ 我认为位置 V 应该包含值 A,如果真的包含A值,则将 B 放到这个位置,否则,不要更改该位置,只告诉我这个位置现在的值(A)即可。 ”整个比较并交换的操作是原子操作

        ABA问题:
        ABA问题是CAS中的一个漏洞。CAS的定义,当且仅当内存值V等于就得预期值A时,CAS才会通过原子方式用新值B来更新V的值,否则不会执行任何操作。那么如果先将预期值A改成B,再改回A,那CAS操作就会误认为A的值从来没有被改变过,这时其他线程的CAS操作仍然能够成功,但是很明显是个漏洞,因为预期值A的值变化过了。如何解决这个异常现象?java并发包为了解决这个漏洞,提供了一个带有标记的原子引用类“AtomicStampedReference”,它可以通过控制变量值的版本来保证CAS的正确性,即在变量前面添加版本号,每次变量更新的时候都把版本号+1,这样变化过程就从“A-B-A”变成了“1A-2B-3A”。

4. 读写锁RWMutex

        首先我们来看看RWMutex大体结构:

  • 看到结构发现读写锁内部包含了一个w Mutex互斥锁。注释也很明确,这个锁的目的就是控制多个写入操作的并发执行
  •  writerSem是写入操作的信号量
  • readerSem是读操作的信号量
  • readerCount是当前读操作的个数

  • readerWait当前写入操作需要等待读操作解锁的个数

        这几个现在看不懂没关系,后面等用到了你再回来看就好了。问题的关键就在于锁和解锁的几个方法,因为我已经看过,所以推荐这几个方法的阅读顺序是RLock,Lock,RUnlock,Unlock。

4.1 RLock(获取读锁)

  • 先不看竞态检测的部分,先重点看红色框中的部分
  • 可以看到,其实很简单,每当有协程需要获取读锁的时候,就将readerCount + 1
  • 但是需要注意的是,这里有一个条件,当readerCount + 1之后的值 < 0的时候,那么将会调用runtime_Semacquire方法

  • 这个方法是一个runtime的方法,会一直等待传入的s出现>0的时候
  • 然后我们可以记得,这里有这样一个情况,当出先readerCount + 1为负数的情况那么就会被等待,看注释我们可以猜到,是当有写入操作出现的时候,那么读操作就会被等待。

4.2 Lock(获取写锁)

  • 写锁稍微复杂一些,但是样子也差不多,我们还是先来看红色框中的部分。
  • 首先操作最前面说的互斥锁,目的就是处理多个写锁并发的情况,因为我们知道写锁只有一把。这里不需要深入互斥锁,只需要知道,互斥锁只有一个人能拿到,所以写锁只有一个人能拿到。
  • 然后重点来了,这里的这个操作细细体会一下,atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders),是将当前的readerCount减去一个非常大的值rwmutexMaxReaders为1 << 30,大概是1073741823这么大吧
  • 所以我们可以从源码中看出,readerCount由于每有一个协程获取读锁就+1,一直都是正数,而当有写锁过来的时候,就瞬间减为很大的负数。
  • 然后做完上面的操作以后的r其实就是原来的readerCount。
  • 后面进行判断,如果原来的readerCount不为0(原来有协程已经获取到了读锁)并且将readerWait加上readerCount(表示需要等待readerCount这么多个读锁进行解锁),如果满足上述条件证明原来有读锁,所以暂时没有办法获取到写锁,所以调用runtime_Semacquire进行等待,等待的信号量为writerSem

4.3 RUnlock(释放读锁)

  • 如果是我们来写的话,可能就是将之前+1的readerCount,-1就完事了,但是其实还有一些操作需要注意。
  • 如果-1之后+1==0是啥情况?没错就是我们常见的,新手程序员,没有获取读锁就想去释放读锁,于是异常了。当然+1之后刚好是rwmutexMaxReaders,就是获取了写锁而去释放了读锁,导致异常。
  • 除去异常情况,剩下的就是r还是<0的情况,那么证明确实有协程正在想要获取写锁,那么就需要操作我们前面看到的readerWait,当readerWait减到0的时候就证明没有人正在持有写锁了,就通过信号量writerSem的变化告知刚才等待的协程(想要获取写锁的协程):你可以进行获取了。

4.4 Unlock(释放写锁)

  • 写锁释放需要恢复readerCount,还记得上锁的时候减了一个很大的数,这个时候要加回来了。
  • 当然加完之后如果>=rwmutexMaxReaders本身,那么还是新手程序员的问题,当没有获取写锁的时候就开始想着释放写锁了。
  • 然后for循环就是为了通知所有在我们RLock方法中看到的,当有因为持有写锁所以等待的那些协程,通过信号量readerSem告诉他们可以动了。
  • 最后别忘记还有一个互斥锁需要释放,让别的协程也可以开始抢写锁了。

        至此,读写锁的分析基本上告一段落了。针对于其中关于竞态分析的代码,有兴趣的小伙伴可以去了解一下。

5. 互斥锁Mutex

        互斥锁比读写锁复杂,但是好在golang给的注释很详细,所以也不困难(注释真的很重要)。我们先来看看里面的一段注释:

        很长的一段英文,我用英语四级的翻译能力给你翻译一下,可以将就看看,如果可以建议你仔细看英文看懂它,因为这对于后面的源码阅读非常重要。
///
这个互斥锁是公平锁

互斥锁有两种操作模式:正常模式和饥饿模式。
在正常模式下等待获取锁的goroutine会以一个先进先出的方式进行排队,但是被唤醒的等待者并不能代表它已经拥有了这个mutex锁,它需要与新到达的goroutine争夺mutex锁。新来的goroutine有一个优势 —— 他们已经在CPU上运行了并且他们,所以抢到的可能性大一些,所以一个被唤醒的等待者有很大可能抢不过。在这样的情况下,被唤醒的等待者在队列的头部。如果一个等待者抢锁超过1ms失败了,就会切换为饥饿模式。

在饥饿模式下,mutex锁会直接由解锁的goroutine交给队列头部的等待者。
新来的goroutine不能尝试去获取锁,即使可能根本就没goroutine在持有锁,并且不能尝试自旋。取而代之的是他们只能排到队伍尾巴上乖乖等着。

如果一个等待者获取到了锁,并且遇到了下面两种情况之一,就恢复成正常工作模式。
情况1:它是最后一个队列中的等待者。
情况2:它等待的时间小于1ms

正常模式下,即使有很多阻塞的等待者,有更好的表现,因为一轮能多次获得锁的机会。饥饿模式是为了避免那些一直在队尾的倒霉蛋。
///

       我的话简单总结就是,互斥锁有两种工作模式,竞争模式和队列模式,竞争就是大家一起抢,队列就是老老实实排队,这两种工作模式会通过一些情况进行切换。

        可以看到,相对读写锁,结构上面很简单,只有两个值,但是千万不要小瞧它,减少了字段就增加了理解难度。

  • state:将一个32位整数拆分为:
  • 当前阻塞的goroutine数(29位)
  • 饥饿状态(1位,0为正常模式;1为饥饿模式)
  • 唤醒状态(1位,0未唤醒;1已唤醒)
  • 锁状态(1位,0可用;1占用)
  • sema:信号量

        方法也很简单,就是Lock和Unlock两个方法,一个上锁,一个解锁,没啥好说的。

5.1 一个方法

        我们先来看一个的要用到的方法

        func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool)
        这个函数,会先判断参数addr指向的被操作值与参数old的值是否相等,如果相等会将参数new替换参数addr所指向的值,不然的话就啥也不做。
        需要特别说明的是,这个方法并不会阻塞。

5.2 几个常量

        这是定义的几个常量,我们在一开始的注释周围可以看到,后面需要用到,暂时记住它们的初始值就好。

mutexLocked = 1 << iota // 1左移0位,是1,二进制是1,(1表示已经上锁)
mutexWoken // 1左移1位,是2,二进制是10
mutexStarving // 1左移2位,是4,二进制是100
mutexWaiterShift = iota // 就是3, 二进制是11

starvationThresholdNs = 1e6 // 这个就是我们一开始在注释里面看到的1ms,一定超过这个门限值就会更换模式

5.3 Lock获取锁

        因为Lock方法比较长,所以我切分一段段看,需要完整的请自己翻看源码。要注意的一点是,一定要时刻记住,Lock方法是做什么的,很简单,就是要抢锁。看不懂的时候想想这个目标。

        第一步,判断state状态是否为0,如果为0,证明没有协程持有锁,那么就很简单了,直接获取到锁,将mutexLocked(为1)赋值到state就可以了。

        看后面的方法时,告诉需要告诉你们一个小技巧,当遇到这种位操作很多的情况,有两个方法挺好用,对于你看源码会有帮助:
        第一个是将所有定值先计算,然后判断非定值的情况;
        第二个是将所有的计算写下来,自己用笔去计算,不要执着于打字。

        然后我们以下面这个段举例:

        首先,看注释应该能明白这一段大致意思是,如果不是饥饿模式,就会进行自旋操作,然后不断循环。

        然后根据上面的技巧,old&(mutexLocked|mutexStarving) == mutexLocked
(下面均为二进制)
mutexLocked = 1
mutexStarving = 11
mutexLocked = 1
        这三个是定值,所以我们容易得到,满足情况的结果为,当old为xxxx0xx(二进制第三位为0)等式成立。也就是我们一开始说的,state的第三位是表示这个锁当前的模式,0为正常模式,1为饥饿模式。

        那么第一个if就表示,如果当前模式为正常模式,且可以自旋,就进入if条件内部。
        if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&

        同样的分析,awoke表示是否唤醒,old&mutexWoken是取第二位,0表示当前协程未被唤醒,old>>mutexWaiterShift表示右移3位,也就是前29位,不为0证明有协程在等待,并且尝试去对比当前m.state与取出时的old状态,尝试去唤醒自己。然后自旋,并且增加自旋次数表示iter,然后重新赋值old。再循环下一次。

(你自己理一理,确实有点绕,仔细想想就想通了就对了。)

        以上是使用自旋的情况,就是canSpin的。

        然后进行判断old&mutexStarving == 0就是第三位为0的情况,还是所说的正常模式。new就马上拿到锁了,new |= mutexLocked,表示或1,就是第一位无论是啥都赋值为1

old&(mutexLocked|mutexStarving),也就是old & 0101
        必须当old的1和3两个位置为1的时候才是true,也就是说当前处于饥饿模式,并且锁已经被占用的情况,那么就需要排队去。
排队也很精妙,new += 1 << mutexWaiterShift
这边注意是先计算1 << mutexWaiterShift也就是将new的前29位+1,就是表示有一个协程在等待了。

        好了到这里你的位操作应该就习惯的差不多了,之后我就直接说结论,不仔细的帮你01表示了,你已经长大了,要学会自己动手了。

        如果当前已经标记为饥饿模式,并且没有锁住,那么设置new为饥饿模式
if starving && old&mutexLocked != 0 {
new |= mutexStarving
}

        如果唤醒,需要在两种情况下重设标志
if awoke {
如果唤醒标志为与awoke不相协调就panic
if new&mutexWoken == 0 {
throw("sync: inconsistent mutex state")
}
设置唤醒状态位0,被唤醒
new &^= mutexWoken
}

        如果获取锁成功

        old&(mutexLocked|mutexStarving) == 0成立表示已经获取锁,就直接退出CAS

        中间这一段我就不多解释了,就是最前面注释说的,满足什么条件转换什么模式,不多说了。然后从队列中,也就是前29位-1。
        需要注意其中有一个runtime_SemacquireMutex和之前看的的runtime_Semacquire是一个意思,只是多了一个参数。

        这个就是这个方法的注释。可以看到,就是多了个队列去排队。

        如果获取锁失败,old刷新状态再次循环,继续cas

5.4 Lock释放

        Unlock就相对简单一些,竞态分析不看。
        其实我们自己想也能想到,unlock就是将标识位改回来嘛。
        然后因为我们已经看过读写锁了,也是同样的道理,如果没有上锁就直接解锁,那肯定报错嘛。

        然后如果是正常模式,如果没有等待的goroutine或goroutine已经解锁完成的情况就直接返回了。如果有等待的goroutine那就通过信号量去唤醒runtime_Semrelease(注意这里是false),同时操作一下队列-1

        如果是饥饿模式就直接唤醒(注意这里是true),反正有队列嘛。

6. 互斥锁总结

        其实话说回来,我们其实看起来也简单,没有冲突的情况下,能拿就拿呗,如果出现冲突了就尝试自旋解决(自旋一般都能解决)如果解决不了就通过信号量解决,同时如果正常模式就是我们说的抢占式,非公平,如果是饥饿模式,就是我们说的排队,公平,防止有一些倒霉蛋一直抢不到。

        整体总结一下,看完源码我们发现,其实锁的设计并不复杂,主要设计我们要学到cas和处理读写状态的信号量通知,对于那些位操作,能看懂,学可能一时半会学不会,因为很难在一开始就设计的那么巧妙,你也体会到了只用一个变量就维护了整个体系是一种艺术。

7. 参考资料