概述
我们前面讲过 操作系统的信号量,以及 golang中的Mutex原理解析,就抛出了一个问题,操作系统的信号量的管理对象是线程,而 Mutex 中使用的信号量是针对协程的,那么这就意味着golang需要重新实现一套基于协程的信号量,随着对golang源码的研究,我发现golang的 runtime 就像一个微型的操作系统,功能非常强大。
go version go1.18.3 windows/amd64
// src/runtime/sema.go
// Semaphore implementation exposed to Go.
// Intended use is provide a sleep and wakeup
// primitive that can be used in the contended case
// of other synchronization primitives.
// Thus it targets the same goal as Linux's futex,
// but it has much simpler semantics.
//
// That is, don't think of these as semaphores.
// Think of them as a way to implement sleep and wakeup
// such that every sleep is paired with a single wakeup,
// even if, due to races, the wakeup happens before the sleep.
//
// See Mullender and Cox, ``Semaphores in Plan 9,''
// https://swtch.com/semaphore.pdf
具体的用法是提供 sleep 和 wakeup 原语
以使其能够在其它同步原语中的竞争情况下使用
因此这里的 semaphore 和 Linux 中的 futex 目标是一致的
只不过语义上更简单一些也就是说,不要认为这些是信号量
把这里的东西看作 sleep 和 wakeup 实现的一种方式
每一个 sleep 都会和一个 wakeup 配对
即使在发生 race 时,wakeup 在 sleep 之前时也是如此
futexfutex
futex(快速用户区互斥的简称)是一个在Linux上实现锁定和构建高级抽象锁如信号量和POSIX互斥的基本工具。
Futex 由一块能够被多个进程共享的内存空间(一个对齐后的整型变量)组成;这个整型变量的值能够通过汇编语言调用CPU提供的原子操作指令来增加或减少,并且一个进程可以等待直到那个值变成正数。Futex 的操作几乎全部在用户空间完成;只有当操作结果不一致从而需要仲裁时,才需要进入操作系统内核空间执行。这种机制允许使用 futex 的锁定原语有非常高的执行效率:由于绝大多数的操作并不需要在多个进程之间进行仲裁,所以绝大多数操作都可以在应用程序空间执行,而不需要使用(相对高代价的)内核系统调用。
semaphorefutexsleepwakeupgoroutinewakeup
主要源码
// src/sync/runtime.go
// SemacquireMutex is like Semacquire, but for profiling contended Mutexes.
// If lifo is true, queue waiter at the head of wait queue.
// skipframes is the number of frames to omit during tracing, counting from
// runtime_SemacquireMutex's caller.
func runtime_SemacquireMutex(s *uint32, lifo bool, skipframes int)
// ----------------------------------------------------------------
// src/runtime/sema.go
type semaRoot struct {
lock mutex
treap *sudog // root of balanced tree of unique waiters.
nwait uint32 // Number of waiters. Read w/o the lock.
}
// Prime to not correlate with any user patterns.
const semTabSize = 251
var semtable [semTabSize]struct {
root semaRoot
pad [cpu.CacheLinePadSize - unsafe.Sizeof(semaRoot{})]byte
}
//go:linkname sync_runtime_SemacquireMutex sync.runtime_SemacquireMutex
func sync_runtime_SemacquireMutex(addr *uint32, lifo bool, skipframes int) {
semacquire1(addr, lifo, semaBlockProfile|semaMutexProfile, skipframes)
}
func semacquire1(addr *uint32, lifo bool, profile semaProfileFlags, skipframes int) {
// 获取当前协程
gp := getg()
if gp != gp.m.curg {
throw("semacquire not on the G stack")
}
// Easy case.
if cansemacquire(addr) {
return
}
// Harder case:
// increment waiter count
// try cansemacquire one more time, return if succeeded
// enqueue itself as a waiter
// sleep
// (waiter descriptor is dequeued by signaler)
s := acquireSudog()
root := semroot(addr)
t0 := int64(0)
s.releasetime = 0
s.acquiretime = 0
s.ticket = 0
if profile&semaBlockProfile != 0 && blockprofilerate > 0 {
t0 = cputicks()
s.releasetime = -1
}
if profile&semaMutexProfile != 0 && mutexprofilerate > 0 {
if t0 == 0 {
t0 = cputicks()
}
s.acquiretime = t0
}
for {
lockWithRank(&root.lock, lockRankRoot)
// Add ourselves to nwait to disable "easy case" in semrelease.
atomic.Xadd(&root.nwait, 1)
// Check cansemacquire to avoid missed wakeup.
if cansemacquire(addr) {
atomic.Xadd(&root.nwait, -1)
unlock(&root.lock)
break
}
// Any semrelease after the cansemacquire knows we're waiting
// (we set nwait above), so go to sleep.
root.queue(addr, s, lifo)
goparkunlock(&root.lock, waitReasonSemacquire, traceEvGoBlockSync, 4+skipframes)
if s.ticket != 0 || cansemacquire(addr) {
break
}
}
if s.releasetime > 0 {
blockevent(s.releasetime-t0, 3+skipframes)
}
releaseSudog(s)
}
func cansemacquire(addr *uint32) bool {
for {
v := atomic.Load(addr)
if v == 0 {
return false
}
if atomic.Cas(addr, v, v-1) {
return true
}
}
}
cansemacquire()runtime/internal/atomic
//go:noescape
func Cas(ptr *uint32, old, new uint32) bool
// src/runtime/internal/atomic/atomic_amd64.s
// bool Cas(int32 *val, int32 old, int32 new)
// Atomically:
// if(*val == old){
// *val = new;
// return 1;
// } else
// return 0;
TEXT ·Cas(SB),NOSPLIT,$0-17
MOVQ ptr+0(FP), BX
MOVL old+8(FP), AX
MOVL new+12(FP), CX
LOCK
CMPXCHGL CX, 0(BX)
SETEQ ret+16(FP)
RET
atomic.Cas(addr, v, v-1)v-1 < 0
cansemacquire()
Easy case 和 Harder caseFast path 和 slow path
skipframereleasetimeacquiretime
数据结构
semtable, semaRoot, sudug
addr 为一个信号量的地址,在一个程序中可能存在多个信号量,那么这些 addr 会被放入 semtable 数组中,采用取模的方式,semtable 长度为251,在声明的时候就做了初始化,每一个元素中包含一个 semaRoot,而 semaRoot 中包含一个平衡二叉数结构,用来存储着竞争信号量的协程 sudug。
func semroot(addr *uint32) *semaRoot {
return &semtable[(uintptr(unsafe.Pointer(addr))>>3)%semTabSize].root
}
// sudog represents a g in a wait list, such as for sending/receiving
// on a channel.
//
// sudog is necessary because the g ↔ synchronization object relation
// is many-to-many. A g can be on many wait lists, so there may be
// many sudogs for one g; and many gs may be waiting on the same
// synchronization object, so there may be many sudogs for one object.
//
// sudogs are allocated from a special pool. Use acquireSudog and
// releaseSudog to allocate and free them.
type sudog struct {
// The following fields are protected by the hchan.lock of the
// channel this sudog is blocking on. shrinkstack depends on
// this for sudogs involved in channel ops.
g *g
next *sudog
prev *sudog
elem unsafe.Pointer // data element (may point to stack)
// The following fields are never accessed concurrently.
// For channels, waitlink is only accessed by g.
// For semaphores, all fields (including the ones above)
// are only accessed when holding a semaRoot lock.
acquiretime int64
releasetime int64
ticket uint32
// isSelect indicates g is participating in a select, so
// g.selectDone must be CAS'd to win the wake-up race.
isSelect bool
// success indicates whether communication over channel c
// succeeded. It is true if the goroutine was awoken because a
// value was delivered over channel c, and false if awoken
// because c was closed.
success bool
parent *sudog // semaRoot binary tree
waitlink *sudog // g.waiting list or semaRoot
waittail *sudog // semaRoot
c *hchan // channel
}
root.queue(addr) 和 root.dequeue(addr)
lifofifo
(parent, prev, next)(waitlink, waittail)
waittail
关于sleep和wakeup协程
与线程的挂起和唤醒原理类似,在前面成功的将协程加入到 semaRoot 之后,只需要将协程的状态设置为 Gwaiting 就可以实现挂起,而唤醒的过程是将其移出 semaRoot ,修改状态,加入到就绪队列。
// src/runtime/sema.go
func readyWithTime(s *sudog, traceskip int) {
if s.releasetime != 0 {
s.releasetime = cputicks()
}
goready(s.g, traceskip)
}
// src/runtime/proc.go
// Puts the current goroutine into a waiting state and unlocks the lock.
// The goroutine can be made runnable again by calling goready(gp).
func goparkunlock(lock *mutex, reason waitReason, traceEv byte, traceskip int) {
gopark(parkunlock_c, unsafe.Pointer(lock), reason, traceEv, traceskip)
}
func goready(gp *g, traceskip int) {
systemstack(func() {
ready(gp, traceskip, true)
})
}
// Gosched yields the processor, allowing other goroutines to run. It does not
// suspend the current goroutine, so execution resumes automatically.
func Gosched() {
checkTimeouts()
mcall(gosched_m)
}
// goyield is like Gosched, but it:
// - emits a GoPreempt trace event instead of a GoSched trace event
// - puts the current G on the runq of the current P instead of the globrunq
func goyield() {
checkTimeouts()
mcall(goyield_m)
}
readyWithTime()
goyield()
runtime.Gosched()
关于lock
在对二叉树做操作的时候肯定是要加锁的,显然这个锁是要加在 semaRoot 上的,而采用 semtable 分散化在一定程度上可以降低锁的粒度。
golang通过 sema 来实现 sync.Mutex,然后在实现 sema 的时候又用了 mutex,那么这里的 mutex 是什么呢?
相关函数
// Mutual exclusion locks. In the uncontended case,
// as fast as spin locks (just a few user-level instructions),
// but on the contention path they sleep in the kernel.
// A zeroed Mutex is unlocked (no need to initialize each lock).
// Initialization is helpful for static lock ranking, but not required.
type mutex struct {
// Empty struct if lock ranking is disabled, otherwise includes the lock rank
lockRankStruct
// Futex-based impl treats it as uint32 key,
// while sema-based impl as M* waitm.
// Used to be a union, but unions break precise GC.
key uintptr
}
goparkunlock(&root.lock, waitReasonSemacquire, traceEvGoBlockSync, 4+skipframes)
lockWithRank(&root.lock, lockRankRoot)
unlock(&root.lock)
// src/runtime/lock_sema.go
func lock2(l *mutex) {
gp := getg()
if gp.m.locks < 0 {
throw("runtime·lock: lock count")
}
gp.m.locks++
// Speculative grab for lock.
if atomic.Casuintptr(&l.key, 0, locked) {
return
}
semacreate(gp.m)
// On uniprocessor's, no point spinning.
// On multiprocessors, spin for ACTIVE_SPIN attempts.
spin := 0
if ncpu > 1 {
spin = active_spin
}
Loop:
for i := 0; ; i++ {
v := atomic.Loaduintptr(&l.key)
if v&locked == 0 {
// Unlocked. Try to lock.
if atomic.Casuintptr(&l.key, v, v|locked) {
return
}
i = 0
}
if i < spin {
procyield(active_spin_cnt)
} else if i < spin+passive_spin {
osyield()
} else {
// Someone else has it.
// l->waitm points to a linked list of M's waiting
// for this lock, chained through m->nextwaitm.
// Queue this M.
for {
gp.m.nextwaitm = muintptr(v &^ locked)
if atomic.Casuintptr(&l.key, v, uintptr(unsafe.Pointer(gp.m))|locked) {
break
}
v = atomic.Loaduintptr(&l.key)
if v&locked == 0 {
continue Loop
}
}
if v&locked != 0 {
// Queued. Wait.
semasleep(-1)
i = 0
}
}
}
}
//go:nowritebarrier
// We might not be holding a p in this code.
func unlock2(l *mutex) {
gp := getg()
var mp *m
for {
v := atomic.Loaduintptr(&l.key)
if v == locked {
if atomic.Casuintptr(&l.key, locked, 0) {
break
}
} else {
// Other M's are waiting for the lock.
// Dequeue an M.
mp = muintptr(v &^ locked).ptr()
if atomic.Casuintptr(&l.key, v, uintptr(mp.nextwaitm)) {
// Dequeued an M. Wake it.
semawakeup(mp)
break
}
}
}
gp.m.locks--
if gp.m.locks < 0 {
throw("runtime·unlock: lock count")
}
if gp.m.locks == 0 && gp.preempt { // restore the preemption request in case we've cleared it in newstack
gp.stackguard0 = stackPreempt
}
}
atomic.Casuintptr(&l.key, 0, 1)