一、锁的基础知识

1. 互斥量/互斥锁

互斥量(Mutex), 又称为互斥锁, 是一种用来保护临界区的特殊变量, 它可以处于锁定(locked) 状态, 也可以处于解锁(unlocked) 状态。

在编程中,引入了对象互斥锁的概念,来保证共享数据操作的完整性。每个对象都对应于一个可称为" 互斥锁" 的标记,这个标记用来保证在任一时刻,只能有一个线程访问该对象。

2. CAS(compare and swap)

解决多线程并行情况下使用锁造成性能损耗的一种机制,CAS操作包含三个操作数——内存位置(V)、预期原值(A)和新值(B)。如果内存位置的值与预期原值相匹配,那么处理器会自动将该位置值更新为新值。否则,处理器不做任何操作。无论哪种情况,它都会在CAS指令之前返回该位置的值。CAS有效地说明了“我认为位置V应该包含值A;如果包含该值,则将B放到这个位置;否则,不要更改该位置,只告诉我这个位置现在的值即可。"

CAS机制执行流程:

CAS存在的问题
1. ABA问题
CAS需要在操作值的时候,检查值有没有发生变化,如果没有发生变化就更新,但是如果一个值原来是A,变成了B,又变成了A,那么使用CAS进行检查时会发现它的值没有发生变化,但是实际上却变化了 —> 这就是所谓的ABA问题。

ABA问题的解决思路其实也很简单,就是使用版本号。在变量前面追加上版本号,每次变量更新的时候把版本号加1,那么A→B→A就会变成1A→2B→3A了。

2. 循环时间长开销大
自旋CAS如果长时间不成功,会给CPU带来非常大的执行开销

3. 只能保证一个共享变量的原子操作
当对一个共享变量执行操作时,我们可以使用循环CAS的方式来保证原子操作,但是对多个共享变量操作时,循环CAS就无法保证操作的原子性,这个时候就可以用锁。

3. 自旋锁

自旋锁与互斥锁比较类似,它们都是为了解决对某项资源的互斥使用。无论是互斥锁,还是自旋锁,在任何时刻,最多只能有一个保持者,也就说,在任何时刻最多只能有一个执行单元获得锁。但是两者在调度机制上略有不同。对于互斥锁,如果资源已经被占用,资源申请者只能进入睡眠状态。但是自旋锁不会引起调用者睡眠,如果自旋锁已经被别的执行单元保持,调用者就一直循环在那里看是否该自旋锁的保持者已经释放了锁,"自旋"一词就是因此而得名。

自旋锁可能存在的2个问题:

  1. 试图递归地获得自旋锁必然会引起死锁:递归程序的持有实例在第二个实例循环,以试图获得相同自旋锁时,不会释放此自旋锁。
    在递归程序中使用自旋锁应遵守下列策略:递归程序决不能在持有自旋锁时调用它自己,也决不能在递归调用时试图获得相同的自旋锁。

  2. 过多占用cpu资源。如果不加限制,由于申请者一直在循环等待,因此自旋锁在锁定的时候,如果不成功,不会睡眠,会持续的尝试,单cpu的时候自旋锁会让其它process动不了. 因此,一般自旋锁实现会有一个参数限定最多持续尝试次数. 超出后, 自旋锁放弃当前time slice. 等下一次机会。

由此可见,自旋锁比较适用于锁使用者保持锁时间比较短的情况。正是由于自旋锁使用者一般保持锁时间非常短,因此选择自旋而不是睡眠是非常必要的,自旋锁的效率远高于互斥锁。

4. 读写锁

读写锁实际是一种特殊的自旋锁,它把对共享资源的访问者划分成读者和写者,读者只对共享资源进行读访问,写者则需要对共享资源进行写操作。

这种锁相对于自旋锁而言,能提高并发性,因为在多处理器系统中,它允许同时有多个读者来访问共享资源,最大可能的读者数为实际的逻辑CPU数。写者是排他性的,一个读写锁同时只能有一个写者或多个读者(与CPU数相关),但不能同时既有读者又有写者。

在读写锁保持期间也是抢占失效的。

如果读写锁当前没有读者,也没有写者,那么写者可以立刻获得读写锁,否则它必须自旋在那里,直到没有任何写者或读者。如果读写锁没有写者,那么读者可以立即获得该读写锁,否则读者必须自旋在那里,直到写者释放该读写锁。

5. 乐观锁 & 悲观锁

乐观锁其实主要就是一种思想,因为乐观锁的操作过程中其实没有没有任何锁的参与,乐观锁只是和悲观锁相对,严格的说乐观锁不能称之为锁。

悲观锁:总是假设最坏的情况,每次去拿数据的时候都认为别人会修改,所以每次在拿数据的时候都会上锁,这样别人想拿这个数据就会阻塞,直到它拿到锁(共享资源每次只给一个线程使用,其它线程阻塞,用完后再把资源转让给其它线程)。
乐观锁:总是假设最好的情况,每次去拿数据的时候都认为别人不会修改,所以不会上锁,只在更新的时候会判断一下在此期间别人有没有去更新这个数据。

乐观锁适用于写比较少的情况下(多读场景),即冲突真的很少发生的时候,这样可以省去了锁的开销,加大了系统的整个吞吐量。但如果是多写的情况,一般会经常产生冲突,这就会导致上层应用会不断的进行retry,这样反倒是降低了性能,所以一般多写的场景下用悲观锁就比较合适。

乐观锁常见的两种实现方式:
1. 版本号机制
CAS机制保证了在更新数据的时候没有被修改为其他数据的同步机制,版本机制就保证了没有被修改过的同步机制

2. CAS机制
当多个线程尝试使用CAS同时更新同一个变量时,只有其中一个线程能更新变量的值,而其它线程都失败。

6. 死锁

死锁是指两个或两个以上的进程在执行过程中,由于竞争资源或者由于彼此通信而造成的一种阻塞的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程。

虽然进程在运行过程中,可能发生死锁,但死锁的发生也必须具备一定的条件,死锁的发生必须具备以下四个必要条件

  • 互斥条件:指进程对所分配到的资源进行排它性使用,即在一段时间内某资源只由一个进程占用。如果此时还有其它进程请求资源,则请求者只能等待,直至占有资源的进程用毕释放。
  • 请求和保持条件:指进程已经保持至少一个资源,但又提出了新的资源请求,而该资源已被其它进程占有,此时请求进程阻塞,但又对自己已获得的其它资源保持不放。
  • 不剥夺条件:指进程已获得的资源,在未使用完之前,不能被剥夺,只能在使用完时由自己释放。
  • 环路等待条件:指在发生死锁时,必然存在一个进程——资源的环形链,即进程集合{P0,P1,P2,···,Pn}中的P0正在等待一个P1占用的资源;P1正在等待P2占用的资源,……,Pn正在等待已被P0占用的资源。
二、go中锁机制

在 Golang 里有专门的方法来实现锁,就是 sync 包,这个包有两个很重要的锁类型。一个叫 Mutex, 利用它可以实现互斥锁。一个叫 RWMutex,利用它可以实现读写锁。

sync.MutexLock()sync.RWMutexRLock()Lock()RLock()mu.RLockLock()

1. Mutex-互斥锁

Mutex 的实现主要借助了 CAS 指令 + 自旋 + 信号量
数据结构:

type Mutex struct {
    state int32
    sema  uint32
}

上述两个加起来只占 8 字节空间的结构体表示了 Go语言中的互斥锁

状态
在默认情况下,互斥锁的所有状态位都是 0,int32 中的不同位分别表示了不同的状态:

  • 1位表示是否被锁定
  • 1位表示是否有协程已经被唤醒
  • 1位表示是否处于饥饿状态
  • 剩下29位表示阻塞的协程数

正常模式和饥饿模式
正常模式:正常模式下waiter都是先入先出,在队列中等待的waiter被唤醒后不会直接获取锁,因为要和新来的goroutine 进行竞争,新来的goroutine相对于被唤醒的waiter是具有优势的,新的goroutine 正在cpu上运行,被唤醒的waiter还要进行调度才能进入状态,所以在并发的情况下waiter大概率抢不过新来的goroutine,这个时候waiter会被放到队列的头部,如果等待的时间超过了1ms,这个时候Mutex就会进入饥饿模式。

饥饿模式:当Mutex进入饥饿模式之后,锁的所有权会从解锁的goroutine移交给队列头部的goroutine,这几个时候新来的goroutine会直接放入队列的尾部,这样很好的解决了老的goroutine一直抢不到锁的场景。

对于两种模式,正常模式下的性能是最好的,goroutine可以连续多次获取锁,饥饿模式解决了取锁公平的问题,但是性能会下降,其实是性能和公平的一个平衡模式。所以在lock的源码里面,当队列只剩本省goroutine一个并且等待时间没有超过1ms,这个时候Mutex会重新恢复到正常模式。

Lock函数

// 加锁
// 如果锁已经被使用,调用goroutine阻塞,直到锁可用
func (m *Mutex) Lock() {
    // 快速路径:没有竞争直接获取到锁,修改状态位为加锁
    if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
        // 开启-race之后会进行判断,正常情况可忽略
        if race.Enabled {
            race.Acquire(unsafe.Pointer(m))
        }
        return
    }
    // 慢路径(以便快速路径可以内联)
    m.lockSlow()
}

来看看Lock函数,分为两个部分, 快速路径,先通过CAS尝试直接获取锁,如果能获取到直接返回,否则进入慢路径的方法,这里的代码注释提到了内联

tips:方法内联
简单的说方法内联就是将被调用方函数代码“复制”到调用方函数中,减少函数调用开销,在2018年之前的go版本中,所有的逻辑都在Lock函数中,并没有拆出来,2018年之后Go开发者将slow path拆出来,当lock方法被频繁调用的时候,有两种情况,如果直接获得锁走的是fast path,这个时候内联就只有fast path 的代码,这样会减少方法调用的堆栈空间和时间的消耗 ,如果处于自旋,锁竞争的情况下,走的是slow path,这个时候才会把lock slow 的方法内联进来,这样方便了编译器做内联。

lockSlow 函数

func (m *Mutex) lockSlow() {
    var waitStartTime int64 //记录请求锁的初始时间
    starving := false //饥饿标记
    awoke := false //唤醒标记
    iter := 0 //自旋次数
    old := m.state  //当前锁的状态
    for {
        //锁处于正常模式还没有释放的时候,尝试自旋
        if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {        
            //在临界区耗时很短的情况下提高性能
            if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 
            && atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
                awoke = true
            }
            runtime_doSpin()
            iter++
            //更新锁的状态
            old = m.state
            continue
        }
        new := old
        // 非饥饿状态进行加锁
        if old&mutexStarving == 0 {
            new |= mutexLocked
        }
        // 等待着数量+1
        if old&(mutexLocked|mutexStarving) != 0 {
            new += 1 << mutexWaiterShift
        }
        
        // 加锁的情况下切换为饥饿模式
        if starving && old&mutexLocked != 0 {
            new |= mutexStarving
        }
        //goroutine 唤醒的时候进行重置标志
        if awoke {
            if new&mutexWoken == 0 {
                throw("sync: inconsistent mutex state")
            }
            new &^= mutexWoken
        }
                
        // 设置新的状态
        if atomic.CompareAndSwapInt32(&m.state, old, new) {
            if old&(mutexLocked|mutexStarving) == 0 {
                break 
            }
            // 判断是不是第一次加入队列
            // 如果之前就在队列里面等待了,加入到队头
            queueLifo := waitStartTime != 0
            if waitStartTime == 0 {
                waitStartTime = runtime_nanotime()
            }
            // 阻塞等待
            runtime_SemacquireMutex(&m.sema, queueLifo, 1)
            // 检查锁是否处于饥饿状态
            starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
            old = m.state
            // 如果锁处于饥饿状态,直接抢到锁
            if old&mutexStarving != 0 {
            
                if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
                    throw("sync: inconsistent mutex state")
                }
                // 设置标志,进行加锁并且waiter-1
                delta := int32(mutexLocked - 1<<mutexWaiterShift)
                // 如果是最后一个的话清除饥饿标志
                if !starving || old>>mutexWaiterShift == 1 {
                    // 退出饥饿模式               
                    delta -= mutexStarving
                }
                atomic.AddInt32(&m.state, delta)
                break
            }
            awoke = true
            iter = 0
        } else {
            old = m.state
        }
    }
    // -race开启检测冲突,可以忽略
    if race.Enabled {
        race.Acquire(unsafe.Pointer(m))
    }
}

Unlock函数

//如果对没有lock 的Mutex进行unlock会报错
//unlock和goroutine是没有绑定的,对于一个Mutex,可以一个goroutine加锁,另一个goroutine进行解锁
func (m *Mutex) Unlock() {
    if race.Enabled {
        _ = m.state
        race.Release(unsafe.Pointer(m))
    }

    // 快速之路,直接解锁,去除加锁位的标记
    new := atomic.AddInt32(&m.state, -mutexLocked)
    if new != 0 {
        // 解锁失败进入慢路径
        //同样的对慢路径做了单独封装,便于内联
        m.unlockSlow(new)
    }
}

unlockSlow函数

func (m *Mutex) unlockSlow(new int32) {
    //解锁一个未加锁的Mutex会报错(可以想想为什么,Mutex使用状态位进行标记锁的状态的)
    if (new+mutexLocked)&mutexLocked == 0 {
        throw("sync: unlock of unlocked mutex")
    }
    if new&mutexStarving == 0 {
        old := new
        for {
            //正常模式下,没有waiter或者在处理事情的情况下直接返回
            if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
                return
            }
            //如果有等待者,设置mutexWoken标志,waiter-1,更新state
            new = (old - 1<<mutexWaiterShift) | mutexWoken
            if atomic.CompareAndSwapInt32(&m.state, old, new) {
                runtime_Semrelease(&m.sema, false, 1)
                return
            }
            old = m.state
        }
    } else {
        // 饥饿模式下会直接将mutex交给下一个等待的waiter,让出时间片,以便waiter执行
        runtime_Semrelease(&m.sema, true, 1)
    }
}

同样的,在unlock也有fastpath和slowpath,fastpath尝试解锁,解锁成功就返回,否则进入slowpath,slowpath分为正常模式的处理和饥饿模式的处理,饥饿模式直接将锁的控制权交给队列中等待的waiter,正常模式分两种情况 如果当前没有waiter,只有自己本身,直接解锁返回,如果有waiter,解锁后唤醒下个等待者。

2. RWMutex-读写锁

RWMutexreaderwriterreaderreaderwriterwriter

RWMutex 对外暴露的方法有五个:

RLock()RUnlock()Lock()Unlock()RLocker()RLock()
2.1 RWMutex流程概览
RWMutex

队列 A 最多只允许有 一个writer,如果有其他 writer,需要在 队列B 等待;
当一个 writer 到了 队列A 后,只允许它 之前的reader 执行读操作,新来的 reader 需要在 队列A 后面排队;
当前面的 reader 执行完读操作之后,writer 执行写操作;
writer 执行完写操作后,让 后面的reader 执行读操作,再唤醒队列B 的一个 writer 到 队列A 后面排队。

初始时刻 队列A 中 writer W1 前面有三个 reader,后面有两个 reader,队列B中有两个 writer


并发读 多个 reader 可以同时获取到读锁,进入临界区进行读操作;writer W1 在 队列A 中等待,同时又来了两个 reader,直接在 队列A 后面排队

写操作 W1 前面所有的 reader 完成后,W1 获得锁,进入临界区操作

获得准入权 W1 完成写操作退出,先让后面排队的 reader 进行读操作,然后从 队列B 中唤醒 W2 到 队列A 排队。W2 从 队列B 到 队列A 的过程中,R8 先到了 队列A,因此 R8 可以执行读操作。R9、R10、R11 在 W2 之后到的,所以在后面排队;新来的 W4 直接在队列B 排队。

从上面的示例可以看出,
RWMutex
可以看作是没有优先级,按照先来先到的顺序去执行,只不过是 多个reader 可以 并行去执行罢了。
2.2 写锁饥饿问题

因为读锁是共享的,所以如果当前已经有读锁,那后续goroutine继续加读锁正常情况下是可以加锁成功,但是如果一直有读锁进行加锁,那尝试加写锁的goroutine则可能会长期获取不到锁,这就是因为读锁而导致的写锁饥饿问题

go通过引入以下特性避免出现写锁饥饿:

  • 当写锁阻塞时,新的读锁是无法申请的
sync.RWMutexmx.Lock()

这种特性可以有效防止写锁饥饿。如果一个线程因为某种原因,导致长时间得不到CPU时间片,这种状态被称之为饥饿

2.3. golang的读写锁源码剖析

成员变量


结构体
type RWMutex struct {
    w           Mutex  // held if there are pending writers
    writerSem   uint32 // 用于writer等待读完成排队的信号量
    readerSem   uint32 // 用于reader等待写完成排队的信号量
    readerCount int32  // 读锁的计数器
    readerWait  int32  // 等待读锁释放的数量
}

写锁计数
读写锁中允许加读锁的最大数量是4294967296,在go里面对写锁的计数采用了负值进行,通过递减最大允许加读锁的数量从而进行写锁对读锁的抢占

const rwmutexMaxReaders = 1 << 30
2.3.1 读锁实现

读锁加锁逻辑

func (rw *RWMutex) RLock() {
    if race.Enabled {
        _ = rw.w.state
        race.Disable()
    }
    // 累加reader计数器,如果小于0则表明有writer正在等待
    if atomic.AddInt32(&rw.readerCount, 1) < 0 {
        // 当前有writer正在等待读锁,读锁就加入排队
        runtime_SemacquireMutex(&rw.readerSem, false)
    }
    if race.Enabled {
        race.Enable()
        race.Acquire(unsafe.Pointer(&rw.readerSem))
    }
}

读锁释放逻辑

func (rw *RWMutex) RUnlock() {
    if race.Enabled {
        _ = rw.w.state
        race.ReleaseMerge(unsafe.Pointer(&rw.writerSem))
        race.Disable()
    }
    // 如果小于0,则表明当前有writer正在等待
    if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
        if r+1 == 0 || r+1 == -rwmutexMaxReaders {
            race.Enable()
            throw("sync: RUnlock of unlocked RWMutex")
        }
        // 将等待reader的计数减1,证明当前是已经有一个读的,如果值==0,则进行唤醒等待的
        if atomic.AddInt32(&rw.readerWait, -1) == 0 {
            // The last reader unblocks the writer.
            runtime_Semrelease(&rw.writerSem, false)
        }
    }
    if race.Enabled {
        race.Enable()
    }
}
2.3.2 写锁实现

加写锁实现

func (rw *RWMutex) Lock() {
    if race.Enabled {
        _ = rw.w.state
        race.Disable()
    }
    // 首先获取mutex锁,同时多个goroutine只有一个可以进入到下面的逻辑
    rw.w.Lock()
    // 对readerCounter进行进行抢占,通过递减rwmutexMaxReaders允许最大读的数量
    // 来实现写锁对读锁的抢占
    r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
    // 记录需要等待多少个reader完成,如果发现不为0,则表明当前有reader正在读取,当前goroutine
    // 需要进行排队等待
    if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
        runtime_SemacquireMutex(&rw.writerSem, false)
    }
    if race.Enabled {
        race.Enable()
        race.Acquire(unsafe.Pointer(&rw.readerSem))
        race.Acquire(unsafe.Pointer(&rw.writerSem))
    }
}

释放写锁

func (rw *RWMutex) Unlock() {
    if race.Enabled {
        _ = rw.w.state
        race.Release(unsafe.Pointer(&rw.readerSem))
        race.Disable()
    }

    // 将reader计数器复位,上面减去了一个rwmutexMaxReaders现在再重新加回去即可复位
    r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
    if r >= rwmutexMaxReaders {
        race.Enable()
        throw("sync: Unlock of unlocked RWMutex")
    }
    // 唤醒所有的读锁
    for i := 0; i < int(r); i++ {
        runtime_Semrelease(&rw.readerSem, false)
    }
    // 释放mutex
    rw.w.Unlock()
    if race.Enabled {
        race.Enable()
    }
}
2.3.3 关键核心机制

写锁对读锁的抢占
加写锁的抢占

    // 在加写锁的时候通过将readerCount递减最大允许加读锁的数量,来实现对加读锁的抢占
    r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders

加读锁的抢占检测

// 如果没有写锁的情况下读锁的readerCount进行Add后一定是一个>0的数字,这里通过检测值为负数
//就实现了读锁对写锁抢占的检测
if atomic.AddInt32(&rw.readerCount, 1) < 0 {
        // A writer is pending, wait for it.
        runtime_SemacquireMutex(&rw.readerSem, false)
    }

写锁抢占读锁后后续的读锁就会加锁失败,但是如果想加写锁成功还要继续对已经加读锁成功的进行等待

if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
        // 写锁发现需要等待的读锁释放的数量不为0,就自己自己去休眠了
        runtime_SemacquireMutex(&rw.writerSem, false)
}

写锁既然休眠了,则必定要有一种唤醒机制其实就是每次释放锁的时候,当检查到有加写锁的情况下,就递减readerWait,并由最后一个释放reader lock的goroutine来实现唤醒写锁

if atomic.AddInt32(&rw.readerWait, -1) == 0 {
            // The last reader unblocks the writer.
            runtime_Semrelease(&rw.writerSem, false)
}
3. 常见问题
  1. 不可复制
    和 Mutex 一样,RWMutex 也是不可复制。不能复制的原因和互斥锁一样。一旦读写锁被使用,它的字段就会记录它当前的一些状态。这个时候你去复制这把锁,就会把它的状态也给复制过来。但是,原来的锁在释放的时候,并不会修改你复制出来的这个读写锁,这就会导致复制出来的读写锁的状态不对,可能永远无法释放锁。
  2. 不可重入
    不可重入的原因是,获得锁之后,还没释放锁,又申请锁,这样有可能造成死锁。比如 reader A 获取到了读锁,writer B 等待 reader A 释放锁,reader 还没释放锁又申请了一把锁,但是这把锁申请不成功,他需要等待 writer B。这就形成了一个循环等待的死锁。
  3. 加锁和释放锁一定要成对出现,不能忘记释放锁,也不能解锁一个未加锁的锁。

References:
https://blog.csdn.net/nrsc272420199/article/details/105032873
https://cloud.tencent.com/developer/article/1700079
https://blog.csdn.net/qq_37935909/article/details/108625508
https://www.jianshu.com/p/766093f59687
https://blog.csdn.net/weixin_43580319/article/details/117048449
https://geektutu.com/post/hpg-mutex.html
https://zhuanlan.zhihu.com/p/260205701
https://zhuanlan.zhihu.com/p/98443808
https://www.jianshu.com/p/679041bdaa39
https://blog.csdn.net/hudeyong926/article/details/126467118
https://www.cnblogs.com/CJ-cooper/p/15477232.html
https://mp.weixin.qq.com/s?__biz=MzU5NzU2NDk2MA==&mid=2247485176&idx=1&sn=31ca207428c1c8170f714c3771759279&chksm=fe50cdb7c92744a107aaa6eff17e7e62fff3318adfb664234ae2dc6254f3eb57b6c6b491b8d4#rd
https://studygolang.com/topics/15456