当提到并发编程、多线程编程时,我们往往都离不开『锁』这一概念,Go 语言作为一个原生支持用户态进程 Goroutine 的语言,也一定会为开发者提供这一功能,锁的主要作用就是保证多个线程或者 Goroutine 在访问同一片内存时不会出现混乱的问题,锁其实是一种并发编程中的同步原语(Synchronization Primitives)。

MutexRWMutexWaitGroupOnceCondErrGroupSemaphoreSingleFlight

基本原语

MutexRWMutexOnceWaitGroup

MutexRWMutexOnceWaitGroupCondMapPool

Mutex

syncstatesemastatesema
type Mutex 

状态

int32mutexLockedmutexWokenmutexStarving

0mutexLocked1mutexWoken1mutexStarving

饥饿模式

Mutex
Lock

在饥饿模式中,互斥锁会被直接交给等待队列最前面的 Goroutine,新的 Goroutine 在这时不能获取锁、也不会进入自旋的状态,它们只会在队列的末尾等待,如果一个 Goroutine 获得了互斥锁并且它是队列中最末尾的协程或者它等待的时间少于 1ms,那么当前的互斥锁就会被切换回正常模式。

Mutex

加锁

MutexLockLock0mutexLocked1
func (m *Mutex) Lock() {
LockMutex0lockSlowfor
func (m *Mutex) lockSlow() {

在这段方法的第一部分会判断当前方法能否进入自旋来等待锁的释放,自旋(Spinnig)其实是在多线程同步的过程中使用的一种机制,当前的进程在进入自旋的过程中会一直保持 CPU 的占用,持续检查某个条件是否为真,在多核的 CPU 上,自旋的优点是避免了 Goroutine 的切换,所以如果使用恰当会对性能带来非常大的增益。

Mutexruntime_canSpin
P
runtime_doSpinprocyieldPAUSEPAUSE30PAUSE
0
statemutexLockedmutexStarvingmutexWokenmutexWaiterShift
new := old
atomicruntime_SemacquireMutex
if atomic.CompareAndSwapInt32(&m.state, old, 
runtime_SemacquireMutexMutexMutexruntime_SemacquireMutexgoparkunlock
LockstarvationThresholdNs(1ms)

解锁

UnlockatomicAddInt320unlockSlow
func (m *Mutex) Unlock() {
unlockSlowsync: unlock of unlocked mutex
func (m *Mutex) unlockSlow(new int32) {
runtime_SemreleasemutexLockedmutexStarving
0runtime_Semrelease

小结

Mutex
mutexLockedmutexLockedPAUSE1msruntime_SemacquireMutexLock1ms

互斥锁的解锁过程相对来说就比较简单,虽然对于普通模式和饥饿模式的处理有一些不同,但是由于代码行数不多,所以逻辑清晰,也非常容易理解:

UnlockmutexLockedruntime_Semrelease

RWMutex

syncRWMutex
YN
NN
RWMutex
type RWMutex 
readerCountreaderWait

读锁

atomic.AddInt32readerCountruntime_SemacquireMutex
func (rw *RWMutex) RLock() {
readerCountRUnlock
func (rw *RWMutex) RUnlock() {
readerCountrUnlockSlowreaderWaitwriterSem
func (rw *RWMutex) rUnlockSlow(r int32) {
writerSem

读写锁

LockLockMutexLockatomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders)
func (rw *RWMutex) Lock() {
runtime_SemacquireMutexwriterSem
atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
func (rw *RWMutex) Unlock() {
RWMutex

小结

MutexRWMutexMutex
readerSemwriterSemwreaderCountrwmutexMaxReadersreaderWaitLockRUnlockUnlock
RWMutexMutex

WaitGroup

WaitGroupsync
requests := []*Request{...}

wg := &sync.WaitGroup{}
wg.Add(len(requests))

for _, request := range requests {
go func(r *Request) {
defer wg.Done()

// res, err := service.call(r)
}(request)
}

wg.Wait()
WaitGroupWait

Done

结构体

WaitGroupnoCopyWaitGroup
type WaitGroup 
noCopysync
package main
fmt.Println
noCopyWaitGroup

WaitGroupstatestate1

操作

WaitGroupAddWaitDoneDonewg.Add(-1)
func (wg *WaitGroup) Add(delta int) {
AddWaitGroupcounterAddWaitGroupAddruntime_Semrelease
WaitGroupWait0waiterruntime_Semacquire
func (wg *WaitGroup) Wait() {
Add0

小结

WaitGroup
AddWaitWaitGroupWaitDoneAddAddWaitGroup

Once

syncOnceOnce
Doonly once
func main() {
syncOnceOncedoneMutex
type Once 
OnceDoatomic.LoadUint32doSlow
func (o *Once) Do(f func()) {
doSlowdeferdone1panicdone1

小结

Onceatomic
DopanicDo

Cond

CondCondCondLCond
func main() {
WaitBroadcastBoardcast"listen"

结构体

CondnoCopycopyCheckerCondpanicLLockerLockUnlockNewCond
type Cond 
notifyListCondGoroutine
type notifyList 
headtailwaitnotify

操作

CondWaitruntime_notifyListAdd+1runtime_notifyListWait
func (c *Cond) Wait() {
notifyListWaitnotifyList

func notifyListWait(l *notifyList, t uint32) {
lock(&l.lock)

if less(t, l.notify) {
unlock(&l.lock)
return
}

s := acquireSudog()
s.g = getg()
s.ticket = t
if l.tail == nil {
l.head = s
} else {
l.tail.next = s
}
l.tail = s
goparkunlock(&l.lock, waitReasonSyncCondWait, traceEvGoBlockCond, 3)
releaseSudog(s)
}
goparkunlock

CondSignalBroadcastWait
func (c *Cond) Signal() {
notifyListNotifyAllreadyWithTimegoready
func notifyListNotifyAll(l *notifyList) {
goready
notifyListNotifyOnesudogsudog.ticket == l.notifyreadyWithTime
func notifyListNotifyOne(l *notifyList) {
WaitSignalBroadcast

小结

MutexCondSignalBroadcastfor {}Cond
WaitL.LockpanicSignalBroadcast

扩展原语

x/syncErrGroupSemaphoreSingleFlightSyncMapSyncMapsyncsync.Mapx/syncsync

ErrGroupSemaphoreSingleFlight

ErrGroup

x/sync
var g errgroup.Group
GoWaitGonil

结构体

errgroupGroup
ContextcancelcontextWaitGrouperrerrerrOnce
type Group 
Group

操作

errgroupWithContextContextGroupWithCancelGroup
func WithContext(ctx context.Context) (*Group, context.Context) {
GoWaitGroupcancelerr
func (g *Group) Go(f func() error) {
WaitWaitGroupContext

小结

errgroupGroup
Contextcancel

Semaphore

0

Golang 的扩展包中就提供了带权重的信号量,我们可以按照不同的权重对资源的访问进行管理,这个包对外也只提供了四个方法:

NewWeightedAcquireTryAcquirefalseRelease

结构体

NewWeightedWeighted
func NewWeighted(n int64) *Weighted {
Weightedwaiterscur[0, size]

信号量中的计数器会随着用户对资源的访问和释放进行改变,引入的权重概念能够帮助我们更好地对资源的访问粒度进行控制,尽可能满足所有常见的用例。

获取

Acquire
Weightedselect
func (s *Weighted) Acquire(ctx context.Context, n int64) error {
TryAcquiretruefalse
func (s *Weighted) TryAcquire(n int64) bool {
AcquireTryAcquire

释放

ReleaseReleasewaiters
func (s *Weighted) Release(n int64) {
AcquireContext

小结

带权重的信号量确实有着更多的应用场景,这也是 Go 语言对外提供的唯一一种信号量实现,在使用的过程中我们需要注意以下的几个问题:

AcquireTryAcquireReleaseRelease

SingleFlight

singleflight 是 Go 语言扩展包中提供了另一种同步原语,这其实也是作者最喜欢的一种同步扩展机制,它能够在一个服务中抑制对下游的多次重复请求,一个比较常见的使用场景是 — 我们在使用 Redis 对数据库中的一些热门数据进行了缓存并设置了超时时间,缓存超时的一瞬间可能有非常多的并行请求发现了 Redis 中已经不包含任何缓存所以大量的流量会打到数据库上影响服务的延时和质量。

singleflightKeyKey

singleflightsingleflightsingleflight.Group{}GroupDo
type service 
Do

结构体

GroupMutexKeycallcall
type Group 
callvalerrWaitGroupdupschanssingleflight

操作

singleflightDoDoChan
DoGroupkey
callcallWaitGroupcallMutexdoCallcalldupsMutexWaitGroup.Wait
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
valerrdoCalldoCallWaitGroup.WaitDo
func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
doCallfnc.valc.errWaitGroup.DonecalldoCallDoChan
func (g *Group) DoChan(key string, fn func() (interface{}, error)) chan 
DoChanDodoCallcallchanschan Result

小结

singleflightGroup
DoDoChanForgetsingleflight

总结

我们在这一节中介绍了 Go 语言标准库中提供的基本原语以及扩展包中的扩展原语,这些并发编程的原语能够帮助我们更好地利用 Go 语言的特性构建高吞吐量、低延时的服务,并解决由于并发带来的错误,到这里我们再重新回顾一下这一节介绍的内容:

MutexmutexLockedmutexLockedPAUSE1msruntime_SemacquireMutexLock1msUnlockmutexLockedruntime_SemreleaseRWMutexreaderSemwriterSemwreaderCountrwmutexMaxReadersreaderWaitLockRUnlockUnlockWaitGroupAddWaitWaitGroupWaitDoneAddAddWaitGroupOnceDopanicDoCondWaitL.LockpanicSignalBroadcastErrGroupContextcancelSemaphoreAcquireTryAcquireReleaseReleaseSingleFlightDoDoChanForgetsingleflight

这些同步原语的实现不仅要考虑 API 接口的易用、解决并发编程中可能遇到的线程竞争问题,还需要对尾延时进行优化避免某些 Goroutine 无法获取锁或者资源而被饿死,对同步原语的学习也能够增强我们队并发编程的理解和认识,也是了解并发编程无法跨越的一个步骤。

推荐阅读

  • 学会使用context取消goroutine执行的方法

  • 聊聊在Go语言里使用继承的翻车经历