当提到并发编程、多线程编程时,我们往往都离不开『锁』这一概念,Go 语言作为一个原生支持用户态进程 Goroutine 的语言,也一定会为开发者提供这一功能,锁的主要作用就是保证多个线程或者 Goroutine 在访问同一片内存时不会出现混乱的问题,锁其实是一种并发编程中的同步原语(Synchronization Primitives)。
MutexRWMutexWaitGroupOnceCondErrGroupSemaphoreSingleFlight
基本原语
MutexRWMutexOnceWaitGroup

MutexRWMutexOnceWaitGroupCondMapPool
Mutex
syncstatesemastatesema
type Mutex
状态
int32mutexLockedmutexWokenmutexStarving

0mutexLocked1mutexWoken1mutexStarving
饥饿模式
Mutex
Lock

在饥饿模式中,互斥锁会被直接交给等待队列最前面的 Goroutine,新的 Goroutine 在这时不能获取锁、也不会进入自旋的状态,它们只会在队列的末尾等待,如果一个 Goroutine 获得了互斥锁并且它是队列中最末尾的协程或者它等待的时间少于 1ms,那么当前的互斥锁就会被切换回正常模式。
Mutex
加锁
MutexLockLock0mutexLocked1
func (m *Mutex) Lock() {
LockMutex0lockSlowfor
func (m *Mutex) lockSlow() {
在这段方法的第一部分会判断当前方法能否进入自旋来等待锁的释放,自旋(Spinnig)其实是在多线程同步的过程中使用的一种机制,当前的进程在进入自旋的过程中会一直保持 CPU 的占用,持续检查某个条件是否为真,在多核的 CPU 上,自旋的优点是避免了 Goroutine 的切换,所以如果使用恰当会对性能带来非常大的增益。
Mutexruntime_canSpin
P
runtime_doSpinprocyieldPAUSEPAUSE30PAUSE
0
statemutexLockedmutexStarvingmutexWokenmutexWaiterShift
new := old
atomicruntime_SemacquireMutex
if atomic.CompareAndSwapInt32(&m.state, old,
runtime_SemacquireMutexMutexMutexruntime_SemacquireMutexgoparkunlock
LockstarvationThresholdNs(1ms)
解锁
UnlockatomicAddInt320unlockSlow
func (m *Mutex) Unlock() {
unlockSlowsync: unlock of unlocked mutex
func (m *Mutex) unlockSlow(new int32) {
runtime_SemreleasemutexLockedmutexStarving
0runtime_Semrelease
小结
Mutex
mutexLockedmutexLockedPAUSE1msruntime_SemacquireMutexLock1ms
互斥锁的解锁过程相对来说就比较简单,虽然对于普通模式和饥饿模式的处理有一些不同,但是由于代码行数不多,所以逻辑清晰,也非常容易理解:
UnlockmutexLockedruntime_Semrelease
RWMutex
syncRWMutex
读 | 写 | |
---|---|---|
读 | Y | N |
写 | N | N |
RWMutex
type RWMutex
readerCountreaderWait
读锁
atomic.AddInt32readerCountruntime_SemacquireMutex
func (rw *RWMutex) RLock() {
readerCountRUnlock
func (rw *RWMutex) RUnlock() {
readerCountrUnlockSlowreaderWaitwriterSem
func (rw *RWMutex) rUnlockSlow(r int32) {
writerSem
读写锁
LockLockMutexLockatomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders)
func (rw *RWMutex) Lock() {
runtime_SemacquireMutexwriterSem
atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
func (rw *RWMutex) Unlock() {
RWMutex
小结
MutexRWMutexMutex
readerSemwriterSemwreaderCountrwmutexMaxReadersreaderWaitLockRUnlockUnlock
RWMutexMutex
WaitGroup
WaitGroupsync
requests := []*Request{...}
wg := &sync.WaitGroup{}
wg.Add(len(requests))
for _, request := range requests {
go func(r *Request) {
defer wg.Done()
// res, err := service.call(r)
}(request)
}
wg.Wait()
WaitGroupWait

Done
结构体
WaitGroupnoCopyWaitGroup
type WaitGroup
noCopysync
package main
fmt.Println
noCopyWaitGroup

WaitGroupstatestate1
操作
WaitGroupAddWaitDoneDonewg.Add(-1)
func (wg *WaitGroup) Add(delta int) {
AddWaitGroupcounterAddWaitGroupAddruntime_Semrelease
WaitGroupWait0waiterruntime_Semacquire
func (wg *WaitGroup) Wait() {
Add0
小结
WaitGroup
AddWaitWaitGroupWaitDoneAddAddWaitGroup
Once
syncOnceOnce
Doonly once
func main() {
syncOnceOncedoneMutex
type Once
OnceDoatomic.LoadUint32doSlow
func (o *Once) Do(f func()) {
doSlowdeferdone1panicdone1
小结
Onceatomic
DopanicDo
Cond
CondCondCondLCond
func main() {
WaitBroadcastBoardcast"listen"

结构体
CondnoCopycopyCheckerCondpanicLLockerLockUnlockNewCond
type Cond
notifyListCondGoroutine
type notifyList
headtailwaitnotify
操作
CondWaitruntime_notifyListAdd+1runtime_notifyListWait
func (c *Cond) Wait() {
notifyListWaitnotifyList
func notifyListWait(l *notifyList, t uint32) {
lock(&l.lock)
if less(t, l.notify) {
unlock(&l.lock)
return
}
s := acquireSudog()
s.g = getg()
s.ticket = t
if l.tail == nil {
l.head = s
} else {
l.tail.next = s
}
l.tail = s
goparkunlock(&l.lock, waitReasonSyncCondWait, traceEvGoBlockCond, 3)
releaseSudog(s)
}
goparkunlock

CondSignalBroadcastWait
func (c *Cond) Signal() {
notifyListNotifyAllreadyWithTimegoready
func notifyListNotifyAll(l *notifyList) {
goready
notifyListNotifyOnesudogsudog.ticket == l.notifyreadyWithTime
func notifyListNotifyOne(l *notifyList) {
WaitSignalBroadcast
小结
MutexCondSignalBroadcastfor {}Cond
WaitL.LockpanicSignalBroadcast
扩展原语
x/syncErrGroupSemaphoreSingleFlightSyncMapSyncMapsyncsync.Mapx/syncsync

ErrGroupSemaphoreSingleFlight
ErrGroup
x/sync
var g errgroup.Group
GoWaitGonil
结构体
errgroupGroup
ContextcancelcontextWaitGrouperrerrerrOnce
type Group
Group
操作
errgroupWithContextContextGroupWithCancelGroup
func WithContext(ctx context.Context) (*Group, context.Context) {
GoWaitGroupcancelerr
func (g *Group) Go(f func() error) {
WaitWaitGroupContext
小结
errgroupGroup
Contextcancel
Semaphore
0
Golang 的扩展包中就提供了带权重的信号量,我们可以按照不同的权重对资源的访问进行管理,这个包对外也只提供了四个方法:
NewWeightedAcquireTryAcquirefalseRelease
结构体
NewWeightedWeighted
func NewWeighted(n int64) *Weighted {
Weightedwaiterscur[0, size]

信号量中的计数器会随着用户对资源的访问和释放进行改变,引入的权重概念能够帮助我们更好地对资源的访问粒度进行控制,尽可能满足所有常见的用例。
获取
Acquire
Weightedselect
func (s *Weighted) Acquire(ctx context.Context, n int64) error {
TryAcquiretruefalse
func (s *Weighted) TryAcquire(n int64) bool {
AcquireTryAcquire
释放
ReleaseReleasewaiters
func (s *Weighted) Release(n int64) {
AcquireContext
小结
带权重的信号量确实有着更多的应用场景,这也是 Go 语言对外提供的唯一一种信号量实现,在使用的过程中我们需要注意以下的几个问题:
AcquireTryAcquireReleaseRelease
SingleFlight
singleflight 是 Go 语言扩展包中提供了另一种同步原语,这其实也是作者最喜欢的一种同步扩展机制,它能够在一个服务中抑制对下游的多次重复请求,一个比较常见的使用场景是 — 我们在使用 Redis 对数据库中的一些热门数据进行了缓存并设置了超时时间,缓存超时的一瞬间可能有非常多的并行请求发现了 Redis 中已经不包含任何缓存所以大量的流量会打到数据库上影响服务的延时和质量。

singleflightKeyKey

singleflightsingleflightsingleflight.Group{}GroupDo
type service
Do
结构体
GroupMutexKeycallcall
type Group
callvalerrWaitGroupdupschanssingleflight
操作
singleflightDoDoChan
DoGroupkey
callcallWaitGroupcallMutexdoCallcalldupsMutexWaitGroup.Wait
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
valerrdoCalldoCallWaitGroup.WaitDo
func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
doCallfnc.valc.errWaitGroup.DonecalldoCallDoChan
func (g *Group) DoChan(key string, fn func() (interface{}, error)) chan
DoChanDodoCallcallchanschan Result
小结
singleflightGroup
DoDoChanForgetsingleflight
总结
我们在这一节中介绍了 Go 语言标准库中提供的基本原语以及扩展包中的扩展原语,这些并发编程的原语能够帮助我们更好地利用 Go 语言的特性构建高吞吐量、低延时的服务,并解决由于并发带来的错误,到这里我们再重新回顾一下这一节介绍的内容:
MutexmutexLockedmutexLockedPAUSE1msruntime_SemacquireMutexLock1msUnlockmutexLockedruntime_SemreleaseRWMutexreaderSemwriterSemwreaderCountrwmutexMaxReadersreaderWaitLockRUnlockUnlockWaitGroupAddWaitWaitGroupWaitDoneAddAddWaitGroupOnceDopanicDoCondWaitL.LockpanicSignalBroadcastErrGroupContextcancelSemaphoreAcquireTryAcquireReleaseReleaseSingleFlightDoDoChanForgetsingleflight
这些同步原语的实现不仅要考虑 API 接口的易用、解决并发编程中可能遇到的线程竞争问题,还需要对尾延时进行优化避免某些 Goroutine 无法获取锁或者资源而被饿死,对同步原语的学习也能够增强我们队并发编程的理解和认识,也是了解并发编程无法跨越的一个步骤。
推荐阅读
学会使用context取消goroutine执行的方法
聊聊在Go语言里使用继承的翻车经历