作者 | Leo叔叔 ;责编 | 欧阳姝黎
sync.Pool
sync.PoolGoPool
Talk is cheap,Show me your code

因为Go1.13版本后对sync.Pool做了优化,放弃了利用sync.Mutex加锁的方式该用CAS加带环形数组的双向链表的方式来实现,本文基于Go1.15.8最新稳定版本分析。

基本使用

package main

import "sync"

type Person struct {
 Age int
}

// 初始化pool
var personPool = sync.Pool{
 New: func() interface{} {
  return new(Person)
 },
}

func main() {
 // 获取一个实例
 newPerson := personPool.Get().(*Person)
 // 回收对象 以备其他协程使用
 defer personPool.Put(newPerson)

 newPerson.Age = 25
}

使用起来比较简单大概分三步:

PoolGetPut
sync.Mutexsync.Pool
sync.Pool
import (
 "testing"
)

func BenchmarkWithoutPool(b *testing.B) {
 var p *Person
 b.ReportAllocs()
 b.ResetTimer()
 for i := 0; i < b.N; i++ {
  for j := 0; j < 10000; j++ {
   p = new(Person)
   p.Age = 30
  }
 }
}

func BenchmarkWithPool(b *testing.B) {
 var p *Person
 b.ReportAllocs()
 b.ResetTimer()
 for i := 0; i < b.N; i++ {
  for j := 0; j < 10000; j++ {
   p = personPool.Get().(*Person)
   p.Age = 30
   personPool.Put(p)
  }
 }
}

基准测试结果:

BenchmarkWithoutPool
BenchmarkWithoutPool-8        7630     135523 ns/op    80000 B/op    10000 allocs/op
BenchmarkWithPool
BenchmarkWithPool-8        9865     126072 ns/op        0 B/op        0 allocs/op

工作原理

没有啥一张图搞不定的

allPools

如果不行 那就再来一张

pool architecture

sync.Pool数据结构

type Pool struct {
 noCopy noCopy
 // 实际指向[]poolLocal 每个P对应一个poolLocal 数组大小取决于P的数量 runtime.GOMAXPROCS(0)
 local     unsafe.Pointer 
 localSize uintptr        // []poolLocal的大小

 victim     unsafe.Pointer // local from previous cycle
 victimSize uintptr        // size of victims array
  
  //当缓存池无对应对象时调用
 New func() interface{}
}
Go1.13sync.PoolvictimvictimSize
sync.Poolsync.poolLocalruntime.GOMAXPROCS(0)
type poolLocal struct {
 poolLocalInternal
 // Prevents false sharing on widespread platforms with
 // 128 mod (cache line size) = 0 .
 pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte
}

// Local per-P Pool appendix.
type poolLocalInternal struct {
 private interface{} // 只能被对应的P使用
 shared  poolChain   // 本地的P可以从Head 进行pushHead/popHead 其他的P可以popTail.
}
poolLocalprivateshared
pool chain
poolDequeuevalsheadTail
headTail

代码佐证:

func (d *poolDequeue) unpack(ptrs uint64) (head, tail uint32) {
 const mask = 1<<dequeueBits - 1
 head = uint32((ptrs >> dequeueBits) & mask)
 tail = uint32(ptrs & mask)
 return
}
func (d *poolDequeue) pack(head, tail uint32) uint64 {
 const mask = 1<<dequeueBits - 1
 return (uint64(head) << dequeueBits) |
  uint64(tail&mask)
}
sync.PoolpoolDequeuepoolChainElt

操作方法

sync.Pool

获取对象 p.Get

获取对象,大体流程:

goroutinePruntime_procPinpoolLocalid
func (p *Pool) Get() interface{} {
  // 将当前goroutine与P进行绑定 runtime_procPin禁用抢占
  // 返回poolLocal与P的id
 l, pid := p.pin()
 x := l.private //尝试直接从私有空间拿
 l.private = nil
 if x == nil {
    //从共享区域头部拿
  x, _ = l.shared.popHead()
  if x == nil {
      //直接实在没有 尝试去别人那边看看能不能偷个
   x = p.getSlow(pid)
  }
 }
  // 解除抢占禁用
 runtime_procUnpin()
  // 都没有 那只好自己New一个
 if x == nil && p.New != nil {
  x = p.New()
 }
 return x
}

那么我们来看看goroutine 是怎么跟P绑定的

func (p *Pool) pin() (*poolLocal, int) {
 pid := runtime_procPin()
  // pinSlow中我们先存储local再存储localSize,这里我们以相反顺序加载
  // 因为我们已经禁用了抢占 GC这期间不会发生 因此我们需要观察local的大小至少跟localSize一样
 s := atomic.LoadUintptr(&p.localSize) // load-acquire
 l := p.local                          // load-consume
 if uintptr(pid) < s {
  return indexLocal(l, pid), pid
 }
  // 运行过程中可能会存在调整P的情况 或者GC了
 return p.pinSlow()
}
runtime_procPin()

番外:禁止抢占

func runtime_procPin() int
//go:linkname sync_runtime_procPin sync.runtime_procPin
//go:nosplit
func sync_runtime_procPin() int {
 return procPin()
}
//go:nosplit
func procPin() int {
 _g_ := getg()
 mp := _g_.m

 mp.locks++
 return int(mp.p.ptr().id)
}
procPinprocPingoroutinegorountinelocks
goroutine
mGo runtimeP
  • 第一种情况,进行系统调用的G,因为存在阻塞,傻傻等在那里会比较浪费计算资源,为了让其他goroutine不被饿死
  • 第二种情况,如果一个G运行时间太长,P中其他G得不到执行也会饿死

抢占实现

Gosysmonruntime.mainsysmonGPMGMsysmonnetpoolretakeforcegcscavengeheapretake
//go:nowritebarrierrec
func sysmon() {
  ...
 // retake P's blocked in syscalls
  // and preempt long running G's
  if retake(now) != 0 {
   idle = 0
  } else {
   idle++
  }
  ...
}
func retake(now int64) uint32 {
 ... 
if s == _Prunning || s == _Psyscall {
   // Preempt G if it's running for too long.
   t := int64(_p_.schedtick)
   if int64(pd.schedtick) != t {
    pd.schedtick = uint32(t)
    pd.schedwhen = now
   } else if pd.schedwhen+forcePreemptNS <= now {//G运行时间超过forcePreemptNS
    preemptone(_p_)
    // In case of syscall, preemptone() doesn't
    // work, because there is no M wired to P.
    sysretake = true
   }
  ...
}
GforcePreemptNS(10ms)preemptone(_p_)P
func preemptone(_p_ *p) bool {
 mp := _p_.m.ptr()
 if mp == nil || mp == getg().m {
  return false
 }
 gp := mp.curg
 if gp == nil || gp == mp.g0 {
  return false
 }

 gp.preempt = true

 // Every call in a go routine checks for stack overflow by
 // comparing the current stack pointer to gp->stackguard0.
 // Setting gp->stackguard0 to StackPreempt folds
 // preemption into the normal stack overflow check.
 gp.stackguard0 = stackPreempt

 // Request an async preemption of this P.
 if preemptMSupported && debug.asyncpreemptoff == 0 {
  _p_.preempt = true
  preemptM(mp)
 }

 return true
} 
gp.preemptgp.stackguard0goroutinestackguard0(1<<(8*sys.PtrSize) - 1)& -1314PGstackguard0SPmorestack
//以asm_amd64.s为例
TEXT runtime·morestack(SB),NOSPLIT,$0-0
	... ...
	// Call newstack on m->g0's stack.
	MOVQ	m_g0(BX), BX
	MOVQ	BX, g(CX)
	MOVQ	(g_sched+gobuf_sp)(BX), SP
	CALL	runtime·newstack(SB)
	CALL	runtime·abort(SB)	// crash if newstack returns
	RET
morestacknewstack
//go:nowritebarrierrec
func newstack() {
  ... ...
 if preempt {
  if !canPreemptM(thisg.m) {
   // Let the goroutine keep running for now.
   // gp->preempt is set, so it will be preempted next time.
   gp.stackguard0 = gp.stack.lo + _StackGuard
   gogo(&gp.sched) // never return
  }
 }
 ... ... 
}
//go:nosplit
func canPreemptM(mp *m) bool {
 return mp.locks == 0 && mp.mallocing == 0 && mp.preemptoff == "" && mp.p.ptr().status == _Prunning
}
newstackmp.locks!=0
gopreempt_m(gp)goschedImpl(gp)goroutinecasgstatus(gp_Grunning, _Grunnable)goroutine
runtime_procPingoroutinemlocks

但是还有个问题,为啥GC也拿它没办法?

GoGC
runtime.sysmonruntime.forcegchelperruntime.mallocgc
gcStart(trigger gcTrigger)
func stopTheWorldWithSema() {
 _g_ := getg()

 // If we hold a lock, then we won't be able to stop another M
 // that is blocked trying to acquire the lock.
 if _g_.m.locks > 0 {
  throw("stopTheWorld: holding locks")
 }
 lock(&sched.lock)
 sched.stopwait = gomaxprocs
 atomic.Store(&sched.gcwaiting, 1)
 preemptall()
 // stop current P
 _g_.m.p.ptr().status = _Pgcstop // Pgcstop is only diagnostic.
 sched.stopwait--
 // try to retake all P's in Psyscall status
 for _, p := range allp {
  s := p.status
  if s == _Psyscall && atomic.Cas(&p.status, s, _Pgcstop) {
   if trace.enabled {
    traceGoSysBlock(p)
    traceProcStop(p)
   }
   p.syscalltick++
   sched.stopwait--
  }
 }
 // stop idle P's
 for {
  p := pidleget()
  if p == nil {
   break
  }
  p.status = _Pgcstop
  sched.stopwait--
 }
 wait := sched.stopwait > 0
 unlock(&sched.lock)

 // wait for remaining P's to stop voluntarily
 if wait {
  for {
   // wait for 100us, then try to re-preempt in case of any races
   if notetsleep(&sched.stopnote, 100*1000) {
    noteclear(&sched.stopnote)
    break
   }
   preemptall()
  }
 }

 // sanity checks
 bad := ""
 if sched.stopwait != 0 {
  bad = "stopTheWorld: not stopped (stopwait != 0)"
 } else {
  for _, p := range allp {
   if p.status != _Pgcstop {
    bad = "stopTheWorld: not stopped (status != _Pgcstop)"
   }
  }
 }
 if atomic.Load(&freezing) != 0 {
  // Some other thread is panicking. This can cause the
  // sanity checks above to fail if the panic happens in
  // the signal handler on a stopped thread. Either way,
  // we should halt this thread.
  lock(&deadlock)
  lock(&deadlock)
 }
 if bad != "" {
  throw(bad)
 }
}
preemptall()PPPPstopPPpreemptall()
func preemptall() bool {
 res := false
 for _, _p_ := range allp {
  if _p_.status != _Prunning {
   continue
  }
  if preemptone(_p_) {
   res = true
  }
 }
 return res
} 
preemptone(_p_)GCSTWGC
runtime_procPinruntime_procUnpingoroutinePPidpid
func indexLocal(l unsafe.Pointer, i int) *poolLocal {
 lp := unsafe.Pointer(uintptr(l) + uintptr(i)*unsafe.Sizeof(poolLocal{}))
 return (*poolLocal)(lp)
} 
Pp.pinSlow()
func (p *Pool) pinSlow() (*poolLocal, int) {
 // Retry under the mutex.
 // Can not lock the mutex while pinned.
 runtime_procUnpin()
 allPoolsMu.Lock()
 defer allPoolsMu.Unlock()
 pid := runtime_procPin()
 // poolCleanup won't be called while we are pinned.
 s := p.localSize
 l := p.local
 if uintptr(pid) < s {
  return indexLocal(l, pid), pid
 }
 if p.local == nil {
  allPools = append(allPools, p)
 }
 // If GOMAXPROCS changes between GCs, we re-allocate the array and lose the old one.
 size := runtime.GOMAXPROCS(0)
 local := make([]poolLocal, size)
 atomic.StorePointer(&p.local, unsafe.Pointer(&local[0])) // store-release
 atomic.StoreUintptr(&p.localSize, uintptr(size))         // store-release
 return &local[pid], pid
}
pinSlow()PallPoolsMu Mutexuintptr(pid) < sallPools []*PoolGCvictim
poolLocal
privatex, _ = l.shared.popHead()
func (c *poolChain) popHead() (interface{}, bool) {
 d := c.head
 for d != nil {
  if val, ok := d.popHead(); ok {
   return val, ok
  }
  // There may still be unconsumed elements in the
  // previous dequeue, so try backing up.
  d = loadPoolChainElt(&d.prev)
 }
 return nil, false
}
PoolChainEltprevd.popHead()
func (d *poolDequeue) popHead() (interface{}, bool) {
 var slot *eface
 for {
  ptrs := atomic.LoadUint64(&d.headTail)
  head, tail := d.unpack(ptrs)
  if tail == head {
   // Queue is empty.
   return nil, false
  }

  // Confirm tail and decrement head. We do this before
  // reading the value to take back ownership of this
  // slot.
  head--
  ptrs2 := d.pack(head, tail)
  if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) {
   // We successfully took back slot.
   slot = &d.vals[head&uint32(len(d.vals)-1)]
   break
  }
 }

 val := *(*interface{})(unsafe.Pointer(slot))
 if val == dequeueNil(nil) {
  val = nil
 }
 // Zero the slot. Unlike popTail, this isn't racing with
 // pushHead, so we don't need to be careful here.
 *slot = eface{}
 return val, true
}

逻辑也比较简单

headTail

2.2 接着将head索引减1,然后将head、tail再打包回去,通过CAS判断当前没有并发修改就拿到数据 跳出循环 否则循环等待

2.3 将slot转为interface{}类型

2.4 将slot赋值为eface{}

Pp.getSlow(pid)
func (p *Pool) getSlow(pid int) interface{} {
 // See the comment in pin regarding ordering of the loads.
 size := atomic.LoadUintptr(&p.localSize) // load-acquire
 locals := p.local                        // load-consume
 // Try to steal one element from other procs.
 for i := 0; i < int(size); i++ {
  l := indexLocal(locals, (pid+i+1)%int(size))
  if x, _ := l.shared.popTail(); x != nil {
   return x
  }
 }

 // Try the victim cache. We do this after attempting to steal
 // from all primary caches because we want objects in the
 // victim cache to age out if at all possible.
 size = atomic.LoadUintptr(&p.victimSize)
 if uintptr(pid) >= size {
  return nil
 }
 locals = p.victim
 l := indexLocal(locals, pid)
 if x := l.private; x != nil {
  l.private = nil
  return x
 }
 for i := 0; i < int(size); i++ {
  l := indexLocal(locals, (pid+i)%int(size))
  if x, _ := l.shared.popTail(); x != nil {
   return x
  }
 }

 // Mark the victim cache as empty for future gets don't bother
 // with it.
 atomic.StoreUintptr(&p.victimSize, 0)

 return nil
} 
l.shared.popTail() 
func (c *poolChain) popTail() (interface{}, bool) {
 d := loadPoolChainElt(&c.tail)
 if d == nil {
  return nil, false
 }

 for {
  // It's important that we load the next pointer
  // *before* popping the tail. In general, d may be
  // transiently empty, but if next is non-nil before
  // the pop and the pop fails, then d is permanently
  // empty, which is the only condition under which it's
  // safe to drop d from the chain.
  d2 := loadPoolChainElt(&d.next)

  if val, ok := d.popTail(); ok {
   return val, ok
  }

  if d2 == nil {
   // This is the only dequeue. It's empty right
   // now, but could be pushed to in the future.
   return nil, false
  }

  // The tail of the chain has been drained, so move on
  // to the next dequeue. Try to drop it from the chain
  // so the next pop doesn't have to look at the empty
  // dequeue again.
  if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&c.tail)), unsafe.Pointer(d), unsafe.Pointer(d2)) {
   // We won the race. Clear the prev pointer so
   // the garbage collector can collect the empty
   // dequeue and so popHead doesn't back up
   // further than necessary.
   storePoolChainElt(&d2.prev, nil)
  }
  d = d2
 }
}
nextPoolChainEltd.popTail()
func (d *poolDequeue) popTail() (interface{}, bool) {
 var slot *eface
 for {
  ptrs := atomic.LoadUint64(&d.headTail)
  head, tail := d.unpack(ptrs)
  if tail == head {
   // Queue is empty.
   return nil, false
  }
  ptrs2 := d.pack(head, tail+1)
  if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) {
   slot = &d.vals[tail&uint32(len(d.vals)-1)]
   break
  }
 }
 val := *(*interface{})(unsafe.Pointer(slot))
 if val == dequeueNil(nil) {
  val = nil
 }
 slot.val = nil
 atomic.StorePointer(&slot.typ, nil)
 return val, true
}
popHeadheadTail
popHeadatomic.StorePointer(&slot.typ, nil)pushHeadpopTail

3.2 那如果偷都偷不到,会进行以下操作

size = atomic.LoadUintptr(&p.victimSize)
 if uintptr(pid) >= size {
  return nil
 }
 locals = p.victim
 l := indexLocal(locals, pid)
 if x := l.private; x != nil {
  l.private = nil
  return x
 }
 for i := 0; i < int(size); i++ {
  l := indexLocal(locals, (pid+i)%int(size))
  if x, _ := l.shared.popTail(); x != nil {
   return x
  }
 }

 // Mark the victim cache as empty for future gets don't bother
 // with it.
 atomic.StoreUintptr(&p.victimSize, 0)
victim cache
受害者缓存是由Norman Jouppi提出的一种提高缓存性能的硬件技术。如他的论文所述
Miss caching places a fully-associative cache between cache and its re-fill path. Misses in the cache that hit in the miss cache have a one cycle penalty, as opposed to a many cycle miss penalty without the miss cache. Victim Caching is an improvement to miss caching that loads the small fully-associative cache with victim of a miss and not the requested cache line.

大概意思就是在旧缓存和缓解重建的过程中,添加一个全关联的缓存(保存旧缓存数据)。也就是说当一级缓存踢出的数据,放到受害者缓存中。当我们在一级缓存未命中,则可以继续尝试从受害者缓存中查询。

如代码:

size = atomic.LoadUintptr(&p.victimSize)
 if uintptr(pid) >= size {
  return nil
 }
 locals = p.victim
 l := indexLocal(locals, pid)
 if x := l.private; x != nil {
  l.private = nil
  return x
 }
 for i := 0; i < int(size); i++ {
  l := indexLocal(locals, (pid+i)%int(size))
  if x, _ := l.shared.popTail(); x != nil {
   return x
  }
 }

 // Mark the victim cache as empty for future gets don't bother
 // with it.
 atomic.StoreUintptr(&p.victimSize, 0)

如果能理解,其实还是挺简单的,也就是

local1 ->GC ->local2 victim->local1
Local2 ->GC ->local3 victim->local2
  1. 很遗憾getSlow也没拿到 那只好自己手动new一个了
if x == nil && p.New != nil {
  x = p.New()
 }

用完返回Pool p.Put

GetPut
func (p *Pool) Put(x interface{}) {
 if x == nil {
  return
 }
  // 将goroutine与P绑定 runtime_procPin禁用抢占 返回poolLocal
 l, _ := p.pin()
 if l.private == nil {//优先放到私有空间
  l.private = x
  x = nil
 }
 if x != nil { //放回共享空间
  l.shared.pushHead(x)
 }
  // 解除抢占禁用
 runtime_procUnpin()
} 

基本逻辑:

p.pinpoolLocal
func (c *poolChain) pushHead(val interface{}) {
 d := c.head
 if d == nil {
  // Initialize the chain.
  const initSize = 8 // Must be a power of 2
  d = new(poolChainElt)
  d.vals = make([]eface, initSize)
  c.head = d
  storePoolChainElt(&c.tail, d)
 }
 if d.pushHead(val) {
  return
 }

 newSize := len(d.vals) * 2
 if newSize >= dequeueLimit {
  // Can't make it any bigger.
  newSize = dequeueLimit
 }

 d2 := &poolChainElt{prev: d}
 d2.vals = make([]eface, newSize)
 c.head = d2
 storePoolChainElt(&d.next, d2)
 d2.pushHead(val)
}

putHead逻辑主要是将对象放到双向链表的对应节点的环形数组中。

先获取双向链表的head节点
若head节点为空 则初始化head节点 节点对应环形数组初始大小为8
将对象放到环形数组中

func (d *poolDequeue) pushHead(val interface{}) bool {
 ptrs := atomic.LoadUint64(&d.headTail)
 head, tail := d.unpack(ptrs)
 if (tail+uint32(len(d.vals)))&(1<<dequeueBits-1) == head {
  // Queue is full.
  return false
 }
 slot := &d.vals[head&uint32(len(d.vals)-1)]
 typ := atomic.LoadPointer(&slot.typ)
 if typ != nil {// popTail可能还没处理完
  return false
 }

 // The head slot is free, so we own it.
 if val == nil {
  val = dequeueNil(nil)
 }
 *(*interface{})(unsafe.Pointer(slot)) = val
 atomic.AddUint64(&d.headTail, 1<<dequeueBits)
 return true
}
popHeadpushHeadpopTailslot.typpopTail

关于GC清除数据问题

pool.go
gcTriggergcStart()clearpools()poolCleanup()
func init() {
 runtime_registerPoolCleanup(poolCleanup)
}
//go:linkname sync_runtime_registerPoolCleanup sync.runtime_registerPoolCleanup
func sync_runtime_registerPoolCleanup(f func()) {
 poolcleanup = f
}
func poolCleanup() {
 for _, p := range oldPools {
  p.victim = nil
  p.victimSize = 0
 }

 for _, p := range allPools {
  p.victim = p.local
  p.victimSize = p.localSize
  p.local = nil
  p.localSize = 0
 }

 oldPools, allPools = allPools, nil
} 
victim

最后的最后,细心的你可能发现 还遗漏了两个细节

noCopy

sync.PoolnoCopysync.PoolGogo vet

举个例子

type noCopy struct{}

// Lock is a no-op used by -copylocks checker from `go vet`.
func (*noCopy) Lock()   {}
func (*noCopy) Unlock() {}
type People struct {
 noCopy noCopy
}

func say(p People) {

}

func main() {
 var p People
 say(p)
}


go vet demo.go


输出:

# command-line-arguments
./demo.go:12:12: say passes lock by value: command-line-arguments.People contains command-line-arguments.noCopy
./demo.go:18:6: call of say copies lock value: command-line-arguments.People contains command-line-arguments.noCopy

当然直接执行不会报任何错

pad

type poolLocal struct {
 poolLocalInternal

 // Prevents false sharing on widespread platforms with
 // 128 mod (cache line size) = 0 .
 pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte
}
pad伪共享

缓存系统中我们是以缓存行(cache line)为单位,通常大小为64字节。上面这张图,我们可以看到L1、L2、L3三级缓存他们和内存的读取速度当然取决于他们与CPU紧密程度。L1>L2>L3>内存

但是!我们现在使用的都是多核CPU的计算机,如何保证多核看到的数据的一致性呢?这里我们需要谈到一个协议-MESI协议,M、E、S、I分别表示缓存行的4个状态

M(修改,Modified):本地处理器已经修改缓存行,即是脏行,它的内容与内存中的内容不一样,并且此 cache 只有本地一个拷贝(专有);
E(专有,Exclusive):缓存行内容和内存中的一样,而且其它处理器都没有这行数据;
S(共享,Shared):缓存行内容和内存中的一样, 有可能其它处理器也存在此缓存行的拷贝;
I(无效,Invalid):缓存行失效, 不能使用。

他们转换关系如下:

现在假设我们有以下场景

cache line

怎么解决呢?

@Contended

总结

sync.PoolForkJoinPool
  • work stealing算法
  • CAS如何做到lock-free
  • 设置抢占标志 禁止P被占用 并制止GC
  • Victim cache 受害者缓存是怎么回事儿
  • noCopy是干啥的 怎么实现禁止拷贝
  • 伪共享(false share)
  • Pool GC的机制

不过这也符合Go“少即是多”的设计理念。

感谢阅读,我是 @Golang发烧友
想学习更多Golang编程知识,点击下方卡片试试吧!

Go学习路线图:

Go面试思维导图:

Go语言编程: