前言:
一直使用锁,大学的时候也在操作系统中学过锁的概念以及实现的思路,但是从未真正的从代码级别看过如何实现,趁着这个机会,看看。
一、为何golang底层的socket的底层读写都要加锁
当读到netFD的源码的时候,发现netFD的Read和Write函数都会加锁,查看netFD的结构体,发现它加了一个
fdmu fdMutex
看其类型名称,就知道它是专门为文件描述符定制的锁,下面以这个为切入点探索下各种锁。
1.1 golang互斥锁原理
互斥锁是我是在golang中最常用的锁了,互斥锁是在多协程的时候使用,在某一时刻只能有一个协程占用资源。结合在网络上查阅的资料、golang锁的源码,现总结如下:
golang 提供的互斥锁为sync.Mutex,并且只提供了lock和unlock,我们就以这两个函数为切入点。
先来看看Mutex的结构体
type Mutex struct {
state int32 //互斥锁上锁状态
sema uint32 //信号量,向处于Gwaitting的G发送信号
}
const (
mutexLocked = 1 << iota // 1 互斥锁是锁定的
mutexWoken // 2 唤醒锁
mutexWaiterShift = iota // 2 统计阻塞在这个互斥锁上的goroutine数目需要移位的数值
)
注意:
golang的const设置常亮有一个小的知识点,iota的含义是在const中第几行的意思,从第0行开始,比如mutexLocked = 1<<iota 最终等于1,mutexWoken为什么等于2呢?因为const如果不显示的赋值给变量,变量会复制上一行的赋值语句,也是1<<iota,但是它是在第一行,所以的应该是1<<1即为2,第三行的mutexWaiterShift的值直接等于iota,也就是2了。
我们看到Mutex的采用的是信号量的机制,解释下mutexLocked,mutexWoken,mutexWaiterShift三个变量作为一定恒定值,用于对state进行位移操作来修改或者读取对应位的值。
state表示Mutex的状态,是一个32位int类型,其不同位代表含义不同,
0位代表锁的占用状态(1被占用,0可用),1位代表当前协程是否被唤醒(1被唤醒,0sleep中),2~31位代表当前阻塞在Mutex的协程数量。
ok,先上Lock代码+翻译
func (m *Mutex) Lock() {
//首先查看当前的state是否为0(未被占用),如果为0,则把state的第0位置为1,表示占用当前锁,返回
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return
}
//初始化唤醒标志位为否,自旋锁次数置为0
awoke := false
iter := 0
for {
//保存当前锁状态
old := m.state
//创建一个新的锁状态,把锁占用状态设置为1
new := old | mutexLocked
//判断当前锁是否被占用
if old&mutexLocked != 0 {
//是否可以进入自旋
if runtime_canSpin(iter) {
//条件:1.我们设置的唤醒标志位为否
// 2.当前锁的唤醒标志位为否
// 3.还有协程阻塞在当前的锁
// 4.尝试设置当前锁的唤醒标志位为1(目的是用于通知占用当前锁的协程在释放锁的时候别再唤醒其他的协程了,稍后看下unlock的源码就可以验证)
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
//把唤醒标志位设为true
awoke = true
}
//开始自旋
runtime_doSpin()
//自旋次数加1
iter++
continue
}
//把锁的等待个数加1
new = old + 1<<mutexWaiterShift
}
if awoke {
if new&mutexWoken == 0 {
throw("sync: inconsistent mutex state")
}
//清除唤醒标志
new &^= mutexWoken
}
//尝试更新当前锁的状态
if atomic.CompareAndSwapInt32(&m.state, old, new) {
//如果当前锁不在被占用,结束循环
if old&mutexLocked == 0 {
break
}
//如果所还在被占用,则把当前协程sleep,等待被唤醒
runtime_SemacquireMutex(&m.sema)
//当前协程被唤醒后,设置该协程的唤醒标志位为true
awoke = true
iter = 0
}
}
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
}
理解这段代码也有几个小知识先注明下:
- 我们看到代码中有很多的位移操作,与、或这些都比较简单,但是有一个“&^”的操作符,解释下它的含义
此运算符是双目运算符,按位计算,将运算符左边数据相异的位保留,相同位清零。 - race这个玩意目前先忽略,与我们理解锁没有关系,这个是go做race检测时候用的。
- 我们看到很多地方用到了atomic包的一些函数,atomic包是由golang提供的low-level的原子操作封装,主要用来解决进程同步为题,官方并不建议直接使用。操作系统级的锁的实现方案是提供原子操作,然后基本上所有锁相关都是通过这些原子操作来实现。
- CompareAndSwapInt32(&addr, old, new)的含义是:如果*addr == old ,那么*addr = new
- runtime_canSpin和runtime_doSpin分别是判断是否可以自选与自旋操作
golang的互斥锁虽然引用了自旋锁,但是不会一直的自旋下去,只是会尝试几次,看下其源码
func sync_runtime_canSpin(i int) bool {
// sync.Mutex is cooperative, so we are conservative with spinning.
// Spin only few times and only if running on a multicore machine and
// GOMAXPROCS>1 and there is at least one other running P and local runq is empty.
// As opposed to runtime mutex we don't do passive spinning here,
// because there can be work on global runq on on other Ps.
if i >= active_spin || ncpu <= 1 || gomaxprocs <= int32(sched.npidle+sched.nmspinning)+1 {
return false
}
if p := getg().m.p.ptr(); !runqempty(p) {
return false
}
return true
}
限制条件:
1).多核(如果是单核cpu没必要这么空耗)
2).GOMAXPROCS>1
3).至少有一个运行的P并且local的P队列为空(与协程的调度有关知识,稍后详细介绍)
4).自旋次数不超过设定的阈值
那么继续看下runtime_doSpin干了什么
func sync_runtime_doSpin() {
procyield(active_spin_cnt)
}
查阅资料结果:procyield函数是汇编语言实现。函数内部循环调用PAUSE指令。PAUSE指令什么都不做,但是会消耗CPU时间,在执行PAUSE指令时,CPU不会对它做不必要的优化。
总结下,所谓自旋就是一直循环等待了,只是golang的互斥锁并不会一直循环等待,只会在特定的条件下尝试几次。自旋的操作无非就是空耗CPU了。
- 我们再来分析下runtime_SemacquireMutex干了什么
func runtime_SemacquireMutex(*uint32)
func sync_runtime_SemacquireMutex(addr *uint32) {
semacquire(addr, semaBlockProfile|semaMutexProfile)
}
func semacquire(addr *uint32, profile bool) {
gp := getg()
if gp != gp.m.curg {
throw("semacquire not on the G stack")
}
if cansemacquire(addr) {
return
}
s := acquireSudog()
root := semroot(addr)
t0 := int64(0)
s.releasetime = 0
if profile && blockprofilerate > 0 {
t0 = cputicks()
s.releasetime = -1
}
for {
lock(&root.lock)
atomic.Xadd(&root.nwait, 1)
if cansemacquire(addr) {
atomic.Xadd(&root.nwait, -1)
unlock(&root.lock)
break
}
root.queue(addr, s)
goparkunlock(&root.lock, "semacquire", traceEvGoBlockSync, 4)
if cansemacquire(addr) {
break
}
}
if s.releasetime > 0 {
blockevent(s.releasetime-t0, 3)
}
releaseSudog(s)
}
我只把我理解的部分说下
1).cansemacquire这个函数的作用其实就是抢占信号量,来看下源码
func cansemacquire(addr *uint32) bool {
for {
v := atomic.Load(addr)
if v == 0 {
return false
}
if atomic.Cas(addr, v, v-1) {
return true
}
}
}
注意下,其实我们的互斥锁的信号量最大也就是1
2).semroot(addr)它返回的是一个semroot类型的变量,看下这个结构体的声明
type semaRoot struct {
lock mutex
head *sudog
tail *sudog
nwait uint32 // Number of waiters. Read w/o the lock.
}
解释下,这个结构体其实就是维护信号量与争夺信号量的协程的信息。结构体中维护了一个链表,用于保存等待信号量的协程信息,head和tail就是用于对该链表操作的入口,代码中root.queue(addr, s) 实际上就是把当前的协程入链表中。nwait字段代表的就是当前等待信号量的协程数量。
semaRoot结构体中还有一个mutex类型的锁,注意这跟我们Mutex并非是同一中类型,这是golang内部使用的类型锁,在这里是对链表的操作加锁,对于该锁的详细的源码不再描述,感兴趣同学可以看源码。
3).在for循环中我们看到一个atomic.Xadd(&root.nwait, -1),看到atomic我们就知道这个操作是系统级原子操作,它的作用就是对nwait做减1的操作。
4).接着我们来看goparkunlock,它的作用就厉害了,该函数是解开当前协程与实际的执行体的联系,使得当前协程休眠,释放执行体,这个设计到goroutine的调度原理,稍后会详细介绍。那么这个函数什么时候执行完毕呢?当该协程被唤醒的时候,一般该协程被唤醒也就是占用该信号量的协程释放资源的时候,这时它调用cansemacquire返回为true,否则继续调用goparkunlock休眠,直到抢到信号量则break。
总结下这个函数的作用,其实就是把首先要争夺下信号量,如果争夺不成功,就把自己放到争夺信号量的队列中,并且休眠,直到被唤醒为止。
ok,这几个预备的知识终于描述完了,我们来总体看下互斥锁的流程。

由于里面的逻辑比较绕,最好找几个情况下的例子走一遍流程,也许会更好。
如果你看着上图还是觉得饶,那么我建议你结合unlock的流程来看,也许会好一些。先看下unlock的源码
func (m *Mutex) Unlock() {
if race.Enabled {
_ = m.state
race.Release(unsafe.Pointer(m))
}
new := atomic.AddInt32(&m.state, -mutexLocked)
if (new+mutexLocked)&mutexLocked == 0 {
throw("sync: unlock of unlocked mutex")
}
old := new
for {
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken) != 0 {
return
}
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
runtime_Semrelease(&m.sema)
return
}
old = m.state
}
}
unlock看起来就舒服很多了,ok我们来梳理下,当一个协程要释放它锁占有的互斥锁时:
1.首先它会把当前锁的状态值减1,在释放前一般该锁的最后一位为1,减1就是把占用状态设置为0(原子操作),然后把设置后的状态保存下来。
小思考:我们来设想下,假如刚把当前锁占用状态设置为0,刚好有另外一个协程进入lock的for循环,我们回过头去看下Lock的源码,
old := m.state
new := old | mutexLocked
if old&mutexLocked != 0 {
...
}
if awoke {
...
}
if atomic.CompareAndSwapInt32(&m.state, old, new) {
if old&mutexLocked == 0 {
break
}
runtime_SemacquireMutex(&m.sema)
awoke = true
iter = 0
}
前两个if没进去,直接看第三个if,atomic.CompareAndSwapInt32(&m.state, old, new)执行成功后,抢占锁成功,if old&mutexLocked == 0 判断为true,则直接跳出循环。
看来锁就直接被另外的这个协程抢占了,那么会不会影响unlock的执行呢?实际是不会的(看第4步的解释),我们继续往下看。
2.接着,会判断 if (new+mutexLocked)&mutexLocked==0
这句话的含义其实就在
判断刚进入unlock的时候,锁的状态是否为非占用状态,如果所未被占用,而又调用unlock,是不允许的,需要抛异常
3.用old copy下new存储下来,接着就进入了循环:
来看第一个判断,
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken) != 0 {
return
}
如果这个条件成立,直接返回,不用往下走了(往下走是唤醒等待信号量的协程),来看下这两个或关系的条件:
1).当前的协程等待数量为0,则不用唤醒协程了。
2).当前协程被锁,或者被唤醒了
第二个条件就可以跟Lock中的自旋时候的操作对应上,我们之前描述在lock中自旋的时候,会把锁的唤醒标志位设置为true,用于提醒占用锁的协程在释放锁的时候不要再唤醒其他协程了,就是我们的第二个条件。
4.首先更显下new,等于old上的等待协程数量减1,然后在把唤醒标志位设置true。接着有一个if判断,if atomic.CompareAndSwapInt32(&m.state, old, new)
这句话的含义是,判断当前锁的状态在程序运行的这段时间没有变化的话,就直接把锁状态更新为new,注意什么情况下不成功呢?这就与第1不中的‘小思考’联系上了, 如同小思考中说的,如果在这段时间锁被直接抢占了,if就会判断为false,如果判断为false,会直接 old = m.state 更新下old的状态,这样的话会继续循环,再次到
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken) != 0
1.2 golang的socket的加锁原理
下面进入本文的主题,讲述对socket读写时候加锁的源码。
对socket读写加锁的是
先看下fdMutex结构体的组成,state指的是锁的状态信息,rsema和wsema分别指的是读的信号量与写的信号量。
fdMutex关于读和写的通过参数的方式进行区分,二者采用了相同的函数rwlock和rwunlock,参数为bool类型,true代表读锁操作,false代表写锁操作。
解释下const中常量的作用,这些常量是用于对state变量进行未操作来读取state中不同位代表的含义的值。
state是一个64位的变量

ok,我们分别看下rwlock和rwunlock源码
func (mu *fdMutex) rwlock(read bool) bool {
var mutexBit, mutexWait, mutexMask uint64
var mutexSema *uint32
if read {
mutexBit = mutexRLock
mutexWait = mutexRWait
mutexMask = mutexRMask
mutexSema = &mu.rsema
} else {
mutexBit = mutexWLock
mutexWait = mutexWWait
mutexMask = mutexWMask
mutexSema = &mu.wsema
}
for {
old := atomic.LoadUint64(&mu.state)
if old&mutexClosed != 0 {
return false
}
var new uint64
if old&mutexBit == 0 {
new = (old | mutexBit) + mutexRef
if new&mutexRefMask == 0 {
panic("net: inconsistent fdMutex")
}
} else {
new = old + mutexWait
if new&mutexMask == 0 {
panic("net: inconsistent fdMutex")
}
}
if atomic.CompareAndSwapUint64(&mu.state, old, new) {
if old&mutexBit == 0 {
return true
}
runtime_Semacquire(mutexSema)
}
}
}
解读下过程:
1).首先判断是否为读加锁,这个地方我们能看出对socket的读写锁操作没什么区别,而且上面我们也看到读写锁除了在引用数量的地方有关联,其他地方二者使用的是state不同的位,也就是说读和写操作一般情况下是没有关系的,不会互斥。
我们tcp的socket是全双工的,锁的这个特性也可以理解。
2).我们看for循环,首先读取当前锁的状态存储在old中,这里有一个疑问,在存储当前变量的时候,用了atomic.LoadUint64函数,我们知道这是一个原子,为什么要用原子操作呢?这不就是一个对整型变量的赋值吗?难道整型的赋值不是一个原子操作吗?记得刚才我们看到的互斥锁,在存储当前锁状态的时候是直接赋值的。
old := m.state
这里留一个个人疑问,后续确认原因后补充。
ok,继续。接着判断当前锁是否已经被关闭了,如果被关闭直接返回false,加锁失败。
3).接着判断锁是否被占用,如果未被占用:把old占用标志位置为1,并且把当前锁引用数+1存储到new中,接着判断new中的引用个数是否为0,如果为0就是一个异常了。这个判断引起了我的一个思考:
什么情况下会使得引用数为0呢,可能是异常的操作把其置为0,这是可以理解的。但是还有一个极端的情况,如果锁的引用个数刚好为,你再加1,岂不是再去读取引用个数就为0了,引用个数还是有一个极限的,当然,那些读锁等待数量与写锁等待数量同样有这样的问题。当然了,这种情况极少会发生,哪有那么多的协程同时去操作一个锁。
ok,继续。如果判断锁已经被占用:那么就把old中的锁等待数量加1然后放到new,接着同样的道理检测下new中的等待数量是否为0,如果为0则是异常情况。
4).接着就要修改当前锁的状态了,如果再第3)步中抢占锁成功,则会把锁状态改为抢占成功的new,下面接着下面加一个判断,old中的锁是否被抢占(这个等会有用),如果未被抢占,则返回true,抢锁成功;如果第3)步中抢占锁失败,同样把锁状态改为new(只是等待数量加1),下面的判断就用到了,old的锁在这时已经被抢占的,代码会往下走,调用runtime_Semacquire,看起来跟互斥锁的runtime_SemacquireMutex很相似,作用应该差不多,把当前协程挂起,等待被唤醒。被唤醒后会再次循环,这个时候锁的占用状态应该为false。
我们来看下调用runtime_Semacquire
//go:linkname sync_runtime_Semacquire sync.runtime_Semacquire
func sync_runtime_Semacquire(addr *uint32) {
semacquire(addr, semaBlockProfile)
}
好像他跟之前的互斥锁最终调用的是一个函数,只是传的第二个参数不同,我们看下互斥锁是怎么调用的
//go:linkname sync_runtime_SemacquireMutex sync.runtime_SemacquireMutex
func sync_runtime_SemacquireMutex(addr *uint32) {
semacquire(addr, semaBlockProfile|semaMutexProfile)
}
1.3 自旋锁原理
其实自旋锁我们再阐述goilang的互斥锁的时候,已经说的差不多,这里直接总结下即可:
二者都是使用信号量来完成实现,只是互斥锁会在锁被占用的情况下,进行block,然后等待被唤醒这需要CPU调度的,而自旋锁则不会block,一直进行空耗CPU忙等,直到锁被释放,好处在于锁被释放时不需要CPU,立刻就能感知。但是自旋锁不适合于单核CPU,因为本协程空耗CPU会影响占用锁的协程或者线程的处理。
1.4 golang读写锁原理
读写所的原理我们应该都了解:读与读之间无影响,读与写之间互斥,写与写之间互斥。
golang实现读写锁,一共有四个函数。RLock和RUnlock是读操作,Lock和Unlock是写操作。
下面上源码:
type RWMutex struct {
w Mutex
writerSem uint32
readerSem uint32
readerCount int32
readerWait int32
}
const rwmutexMaxReaders = 1 << 30
先看下结构体,首先看到读写锁很明显的需要用到互斥锁Mutex,接着分别是写和读信号量,最后是当前读的个数与当前等待读的个数,注意这个等待个数是针对写来说的,并非针对读。
最后一个是rwmutexMaxReaders最大的同时读的数量,稍后看源码的时候就知道它的用处了。
先看读的lock:
func (rw *RWMutex) RLock() {
if race.Enabled {
_ = rw.w.state
race.Disable()
}
if atomic.AddInt32(&rw.readerCount, 1) < 0 {
// A writer is pending, wait for it.
runtime_Semacquire(&rw.readerSem)
}
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(&rw.readerSem))
}
}
先看下读的lock操作,首先尝试把当前锁的读的个数加1,注意看后判断下加1后的读的个数是否小于0,为什么会小于0呢?其实小于0的原因是当前有写操作或者等待的写操作,稍后我们看写的操作的时候就会知道。如果小于0,就调用runtime_Semacquire(&rw.readerSem),哈!这个函数我们再熟悉不过了,就是把当前的协程sleep,等待被唤醒,注意传进去的事读的信号量喔。如果大于等0,那就是没有写,直接return即可。
接着看读unlock:
func (rw *RWMutex) RUnlock() {
if race.Enabled {
_ = rw.w.state
race.ReleaseMerge(unsafe.Pointer(&rw.writerSem))
race.Disable()
}
if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
if r+1 == 0 || r+1 == -rwmutexMaxReaders {
race.Enable()
throw("sync: RUnlock of unlocked RWMutex")
}
// A writer is pending.
if atomic.AddInt32(&rw.readerWait, -1) == 0 {
// The last reader unblocks the writer.
runtime_Semrelease(&rw.writerSem)
}
}
if race.Enabled {
race.Enable()
}
}
RUnlock过程稍微有点复杂:
1).首先把当前锁的读的个数减1,然后有一个判断减1后是否小于0,小于我们知道是当前有写或者等待写的协程(实际上不会有正在写的情况,因为写操作会等待读操作完毕后再写,当前既然是读的unlock,那么就代表之前有读,自然不会有正在写),如果大于等于0直接返回即可。
2).继续,如果小于0后进入if,首先会有一个判断,如果为true,则报异常
if r+1 == 0 || r+1 == -rwmutexMaxReaders
r+1==0是什么意思,就是说在上一步减1之前当前读的个数为0,没有读操作而unlock,这肯定是异常了;
r+1==-rwmutexMaxReaders我们放在后面解释写操作的时候解释。
3).接着看,把当前锁的等待读的个数减1,然后判断是否为0,我们在第1)步中已经解释了,进入这个if代表当前有等待写的协程,如果等待读的个数为0就代表没有等待读的协程,可以进行写了,所以代码中调用了runtime_Semrelease(&rw.writerSem),注意这里操作的是写的信号量喔。
ok,我们继续看写操作的Lock
func (rw *RWMutex) Lock() {
if race.Enabled {
_ = rw.w.state
race.Disable()
}
rw.w.Lock()
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
runtime_Semacquire(&rw.writerSem)
}
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(&rw.readerSem))
race.Acquire(unsafe.Pointer(&rw.writerSem))
}
}
1).首先调用rw.w.Lock(),这个互斥锁终于用到了,看起来这个互斥锁只有在写与写之间使用。
2).接着,把锁的读的个数减去rwmutexMaxReaders,rwmutexMaxReaders这是一个极大的值,当前读的个数减去它之后肯定为负了,这也正好解释了我们在看读的操作的时候为什么要判断当前的读的个数是否小于0的原因了,读写之间就是用这样的形式进行通信,然后把减去之后的值再加回来赋值r,r就是当前的读操作的个数。
我们再回头看看第2)步遗留的问题,r+1 == -rwmutexMaxReaders代表,在调用unlock之前readerCount为-rwmutexMaxReaders,它代表的含义是只有写的操作,而没有正在读的操作,所以对读的unlock是异常的。但是这有一个小问题,如果存在以下的情景:
刚开始没有读的操作,先过来一个写的加锁操作,readerCount==-rwmutexMaxReaders,然后紧接着有一个协程A进行读,调用atomic.AddInt32(&rw.readerCount, 1),会使得readerCount==-rwmutexMaxReaders+1,然后阻塞,紧接着一个协程B对锁不进行读lock,直接对读进行unlock,r+1 == -rwmutexMaxReaders+1,就会导致unlock成功。这是一个值得注意的地方,还好,golang有race竞争检测用于帮助开发人员检测出代码的问题。我们看到这些锁的代码中都有race的影子。
3).然后进行如下一个判断,如果为true则调用runtime_Semacquire(&rw.writerSem),释放一个写的信号量,唤醒一个等待写的协程。
if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0
r != 0好理解,当前有读的操作,这是一个&&操作,r!=0满足后,才会进行atomic.AddInt32(&rw.readerWait, r),把当前读锁的个数加到等待读的个数中,为什么还要判断加之后的值是否为0呢?看起来像是一定大于0呀,其实不是的,还是有可能等于0的。该情景如下:
有一个协程A调用读操作,这时readerCount==1,读完后调用RUnlock,代码刚进入函数还没怎么走,这时,刚好有一个协程B进行读操作,刚好走完下面这行代码
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
此时readerCount==-rwmutexMaxReaders+1
然后A协程的RUnlock继续往下走,刚好走完如下这行代码(真XX巧,哈哈)
if atomic.AddInt32(&rw.readerWait, -1) == 0
这时,你会发现readerWait==-1,然后呢,协程B不甘示弱,要走这行我们关注的代码了
if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0
此时r==1,readerWait==-1,相加是不是等于0了,这代表什么,唯一的读操作完毕了,写操作你不用阻塞了,直接lock成功,好神奇。
ok,我们继续看写操作的Unlock源码
func (rw *RWMutex) Unlock() {
if race.Enabled {
_ = rw.w.state
race.Release(unsafe.Pointer(&rw.readerSem))
race.Release(unsafe.Pointer(&rw.writerSem))
race.Disable()
}
r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
if r >= rwmutexMaxReaders {
race.Enable()
throw("sync: Unlock of unlocked RWMutex")
}
for i := 0; i < int(r); i++ {
runtime_Semrelease(&rw.readerSem)
}
rw.w.Unlock()
if race.Enabled {
race.Enable()
}
}
1).首先把之前减的rwmutexMaxReaders加回来,这是readerCount就是当前读的个数,注意这个是有在写操作的这段时间,新加入的读的个数,这些读操作都陷入了阻塞。把加之后的readerCount赋值给r。
2).接下来会有一个异常判断
if r >= rwmutexMaxReaders
什么情况下为true,当然是之前我没有lock过,readerCount没减过rwmutexMaxReaders,你加了之后会大于它。
3).进入一个循环,这些因为写而陷入阻塞的读操作全部唤醒
4).然后释放互斥锁
至此,所有的锁的源码,我们都阅读完毕。实际上我们还没解答为什么golang的socket为什么要加锁呢?这个我们再稍后会解释。