一、前情提要
上一篇我们介绍了Mutex 锁资源的模式及源码分析 。mutex处于正常状态时,申请锁时与自旋状态的goroutine抢占锁失败时,会将对应的goroutine放在等待队列的头或者尾(若是唤醒的goroutine则是队头,若是新创建的goroutine则是队尾)等待唤醒;mutex处于饥饿状态时,申请锁时若没有锁资源则直接放在队尾。
// 加锁 func (m *Mutex) lockSlow() { .... // 竞争失败,使用runtime_SemacquireMutex信号量,保证不会有2个goutine获取 // 既然未能获取到锁, 那么就使用sleep原语阻塞本goroutine // 如果是新来的goroutine,queueLifo=false, 加入到等待队列的尾部,耐心等待 // 如果是唤醒的goroutine, queueLifo=true, 加入到等待队列的头部 runtime_SemacquireMutex(&m.sema, queueLifo, 1) ... }
二、底层源码理解
相当于操作系统中P一个锁一个资源
// SemacquireMutex is like Semacquire, but for profiling contended Mutexes. // If lifo is true, queue waiter at the head of wait queue. // skipframes is the number of frames to omit during tracing, counting from // runtime_SemacquireMutex's caller. func runtime_SemacquireMutex(s *uint32, lifo bool, skipframes int)
底层调用semacquire1,也是主要的处理接口。该接口的功能是:获取 sudog 和 semaRoot ,其中 sudog 是 g 放在等待队列里的包装对象,sudog 里会有 g 的信息和一些其他的参数, semaRoot 则是队列结构体,内部是二插平衡树,把和当前 g 关联的 sudog 放到 semaRoot 里,然后把 g 的状态改为等待,调用调度器执行别的 g,此时当前 g 就停止执行了。一直到被调度器重新调度执行,会首先释放 sudog 然后再去执行别的代码逻辑
//go:linkname sync_runtime_SemacquireMutex sync.runtime_SemacquireMutex func sync_runtime_SemacquireMutex(addr *uint32, lifo bool, skipframes int) { semacquire1(addr, lifo, semaBlockProfile|semaMutexProfile, skipframes) }
1、sudog
sudog运行时用来存放处于阻塞状态的 goroutine 的一个上层抽象,是用来实现用户态信号量的主要机制之一 例如当一个 goroutine 因为等待 channel 的数据需要进行阻塞时,sudog 会将 goroutine 及其用于等待数据的位置进行记录。阻塞的sudog组成一个树堆
type sudog struct { // 当前goroutine g *g // 表示g正在参与一个select isSelect bool // 下一个goroutine next *sudog // 上一个goroutine prev *sudog // 数据元素 elem unsafe.Pointer // data element (may point to stack) // For channels, waitlink is only accessed by g. // For semaphores, all fields (including the ones above) // are only accessed when holding a semaRoot lock. acquiretime int64 releasetime int64 ticket uint32 // 随机优先级概率 parent *sudog // semaRoot 二叉树 waitlink *sudog // g.waiting 列表 or semaRoot waittail *sudog // semaRoot c *hchan // channel }
2、semaRoot
一个 semaRoot 持有不同地址的 sudog 的树堆。每一个 sudog 可能反过来指向等待在同一个地址上的 sudog 的列表
type semaRoot struct { lock mutex treap *sudog // 平衡树的root nwait uint32 // Number of waiters. Read w/o the lock. waiter 的数量 }
3、Treap
Treap是一种平衡树,它较普通二叉查找树而言,每个节点被赋予了一个新的属性:优先级(没错就是类似优先队列的优先),对于Treap中的每个结点,除了它的权值满足二叉查找树的性质外,它的优先级还满足堆性质,也就是结点的优先级小于它所有孩子的优先级,也就是一个最小堆 所以从权值上看,Treap是一个二叉查找树;从优先级上看,Treap是一个堆。所以Treap=Tree+Heap 我们发现普通BST会不平衡是因为有序的数据会使查找路径退化成链,而随机数据使其退化的概率非常小。因此我们在Treap中赋予的这个优先级的值采用随机生成的办法,这样Treap的结构就趋于平衡了
如下图:黑色字体为优先级,旋转之后与原来的平衡树是一致的(3,7,9,11,16)。但是按照优先级进行旋转就会变成最小堆
在sudog中ticket作为优先级的字段,按照ticket进行旋转。
s.ticket = fastrand() | 1 // ticket值随机生成
4、semacquire1具体流程
- 获取一个sudog,并添加goroutine的基本信息
- 获取一个semroot,也就是Treap的根。待将sudog插入该树中,后经过旋转达到新的平衡。
- 将当前goroutine设置为等待状态,与当前m解绑,让m执行其他goroutine
func semacquire1(addr *uint32, lifo bool, profile semaProfileFlags, skipframes int) { gp := getg() if gp != gp.m.curg { throw("semacquire not on the G stack") } // Easy case. // 抢占到信号量直接返回 CAS抢占 if cansemacquire(addr) { return } // 没有抢到信号量 // Harder case: // increment waiter count // try cansemacquire one more time, return if succeeded // enqueue itself as a waiter // sleep // (waiter descriptor is dequeued by signaler) // 获取一个sudog s := acquireSudog() // 获取一个semaRoot root := semroot(addr) t0 := int64(0) s.releasetime = 0 s.acquiretime = 0 s.ticket = 0 if profile&semaBlockProfile != 0 && blockprofilerate > 0 { t0 = cputicks() s.releasetime = -1 } if profile&semaMutexProfile != 0 && mutexprofilerate > 0 { if t0 == 0 { t0 = cputicks() } s.acquiretime = t0 } // 阻塞 for { // 加锁 lock(&root.lock) // nwait+1 atomic.Xadd(&root.nwait, 1) // Check cansemacquire to avoid missed wakeup. if cansemacquire(addr) { atomic.Xadd(&root.nwait, -1) unlock(&root.lock) break } // Any semrelease after the cansemacquire knows we're waiting // (we set nwait above), so go to sleep. // 加到 semaRoot treap 上 root.queue(addr, s, lifo) // 解锁,并且把当前 g 的状态改为等待,然后让当前的 m 调用其他的 g 执行,当前 g 相当于等待 goparkunlock(&root.lock, waitReasonSemacquire, traceEvGoBlockSync, 4+skipframes) if s.ticket != 0 || cansemacquire(addr) { break } } if s.releasetime > 0 { blockevent(s.releasetime-t0, 3+skipframes) } releaseSudog(s) }
接下来看其中几个接口。
5、如何获得一个sudog
这里涉及到一个全局缓存池和 per-P 的缓存池。分配的时候per-P缓存池优先优先分配,当使用完毕后又再次归还给缓存池。 其遵循策略:
- 优先从 per-P 缓存中获取,如果 per-P 缓存为空,则从全局池抓取一半;
- 优先归还到 per-P 缓存,如果 per-P 缓存已满,则将 per-P 缓存的一半归还到全局池。
func acquireSudog() *sudog { mp := acquirem() // 获取当前g对应的m pp := mp.p.ptr() // 获取当前g对应的p if len(pp.sudogcache) == 0 { // 检查 per-P sudogcache 池是否存在可复用的 sudog lock(&sched.sudoglock) // per-P没有,则从全局池取到当前per-P的一半 for len(pp.sudogcache) < cap(pp.sudogcache)/2 && sched.sudogcache != nil { s := sched.sudogcache // 全局池 sched.sudogcache = s.next s.next = nil pp.sudogcache = append(pp.sudogcache, s) // 添加sudog } unlock(&sched.sudoglock) // 这个时候实际是全局池为空,则新生成一个 if len(pp.sudogcache) == 0 { pp.sudogcache = append(pp.sudogcache, new(sudog)) } } n := len(pp.sudogcache) s := pp.sudogcache[n-1] // 取出一个 pp.sudogcache[n-1] = nil pp.sudogcache = pp.sudogcache[:n-1] if s.elem != nil { throw("acquireSudog: found s.elem != nil in cache") } releasem(mp) // m.lock-- return s }
6、在semaRoot添加sudog节点
func (root *semaRoot) queue(addr *uint32, s *sudog, lifo bool) { s.g = getg() // 一个sudog指向当前goroutine s.elem = unsafe.Pointer(addr) s.next = nil s.prev = nil var last *sudog pt := &root.treap for t := *pt; t != nil; t = *pt { if t.elem == unsafe.Pointer(addr) { // lifo是true,放在队头 if lifo { // 用s将t替换掉 // Substitute s in t's place in treap. *pt = s s.ticket = t.ticket s.acquiretime = t.acquiretime s.parent = t.parent s.prev = t.prev s.next = t.next if s.prev != nil { s.prev.parent = s } if s.next != nil { s.next.parent = s } // Add t first in s's wait list. // 将t放入到s的等待链表的第一位 s.waitlink = t s.waittail = t.waittail if s.waittail == nil { s.waittail = t } t.parent = nil t.prev = nil t.next = nil t.waittail = nil } else { // 放在队尾 // Add s to end of t's wait list. if t.waittail == nil { t.waitlink = s } else { t.waittail.waitlink = s } t.waittail = s s.waitlink = nil } return } last = t if uintptr(unsafe.Pointer(addr)) < uintptr(t.elem) { pt = &t.prev } else { pt = &t.next } } s.ticket = fastrand() | 1 s.parent = last *pt = s // 根据ticket随机值作为优先级进行旋转 for s.parent != nil && s.parent.ticket > s.ticket { if s.parent.prev == s { root.rotateRight(s.parent) } else { if s.parent.next != s { panic("semaRoot queue") } root.rotateLeft(s.parent) } } }
至此,阻塞申请一个锁资源的流程大致就是如此。当前goroutine已于对应的M解绑,相当于放在一个缓存池里面,等待有资源释放。
三、总结
我们可以看到上面的一通操作相当于操作系统的sleep信号,但是我们并没有陷入系统调用,浪费CPU等资源。而是在用户态利用缓存池屏蔽内部调度器的存在,创建基于goroutine抽象的信号量。例如,当用户态代码使用互斥锁发生竞争时,能够让用户态代码依附的 goroutine 进行 sleep,将其进行集中存放,并在可用时候被 wakeup,并被重新调度。
这就是为什么golang性能可以这么好