sync.Pool在1.12中实现的原理简述本文基于golang 1.14 对sync.Pool进行分析;
参考 Golang 的 sync.Pool设计思路与原理,这篇文章基于1.12版本对golang的sync.Pool实现原理进行了分析。关于sync.Pool的使用场景、基本概念的理解可以参考前面的文章。
在1.12中sync.Pool的设计思路简单总结一下:
- 将 G 和 P 绑定,设置与P绑定的M禁止抢占以防止 G 被抢占。在绑定期间,GC 无法清理缓存的对象。
- 每个p都有独享的缓存队列,当g进行sync.pool操作时,先找到所属p的private,如果没有对象可用,加锁从 shared切片里获取数据。如果还没有拿到缓存对象,那么到其他P的poolLocal进行偷数据,如果偷不到,那么就创建新对象。
1.12 sync.pool的源码,可以发现sync.pool里会有各种的锁逻辑,从自己的shared拿数据加锁。getSlow()偷其他P缓存,也是需要给每个p加锁。put归还缓存的时候,还是会mutex加一次锁。
go mutex锁的实现原理简单说,他开始也是atomic cas自旋,默认是4次尝试,当还没有拿到锁的时候进行waitqueue gopack休眠调度处理,等待其他协程释放锁时进行goready调度唤醒。
Go 1.13之后,Go 团队对sync.Pool的锁竞争这块进行了很多优化,这里还改变了shared的数据结构,以前的版本用切片做缓存,现在换成了poolChain双端链表。这个双端链表的设计很有意思,你看sync.pool源代码会发现跟redis quicklist相似,都是链表加数组的设计。
1.14 Pool 数据结构type Pool struct {
noCopy noCopy
local unsafe.Pointer // local fixed-size per-P pool, actual type is [P]poolLocal
localSize uintptr // size of the local array
victim unsafe.Pointer // local from previous cycle
victimSize uintptr // size of victims array
// New optionally specifies a function to generate
// a value when Get would otherwise return nil.
// It may not be changed concurrently with calls to Get.
New func() interface{}
}
// Local per-P Pool appendix.
type poolLocalInternal struct {
private interface{} // Can be used only by the respective P.
shared poolChain // Local P can pushHead/popHead; any P can popTail.
}
type poolLocal struct {
poolLocalInternal
// Prevents false sharing on widespread platforms with
// 128 mod (cache line size) = 0 .
pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte
}
Pool.local[P]poolLocal
poolLocalInternal.shared
这里我们先重点分析一下poolChain 是怎么实现并发无锁编程的。
poolChain
type poolChain struct {
// head is the poolDequeue to push to. This is only accessed
// by the producer, so doesn't need to be synchronized.
head *poolChainElt
// tail is the poolDequeue to popTail from. This is accessed
// by consumers, so reads and writes must be atomic.
tail *poolChainElt
}
type poolChainElt struct {
poolDequeue
// next and prev link to the adjacent poolChainElts in this
// poolChain.
//
// next is written atomically by the producer and read
// atomically by the consumer. It only transitions from nil to
// non-nil.
//
// prev is written atomically by the consumer and read
// atomically by the producer. It only transitions from
// non-nil to nil.
next, prev *poolChainElt
}
poolChain也就是说poolChain里面每个元素poolChainElt都是一个双端队列
head指向的poolChainElt,是用于Producer去Push元素的,不需要做同步处理。
tail指向的poolChainElt,是用于Consumer从tail去pop元素的,这里的读写需要保证原子性。
poolChain
poolChainpoolDequeue
poolChain
popHead() (interface{}, bool);
pushHead(val interface{})
popTail() (interface{}, bool)
popHeadpushHeadpopTail
poolChain.popHead()
poolChain
func (c *poolChain) popHead() (interface{}, bool) {
d := c.head
for d != nil {
if val, ok := d.popHead(); ok {
return val, ok
}
// There may still be unconsumed elements in the
// previous dequeue, so try backing up.
d = loadPoolChainElt(&d.prev)
}
return nil, false
}
popHead
poolDequeuepopHeadpoolDequeuepopHeadpoolChainpopHeadpoolDequeue
poolChain.pushHead()
func (c *poolChain) pushHead(val interface{}) {
d := c.head
if d == nil {
// Initialize the chain.
const initSize = 8 // Must be a power of 2
d = new(poolChainElt)
d.vals = make([]eface, initSize)
c.head = d
storePoolChainElt(&c.tail, d)
}
if d.pushHead(val) {
return
}
// The current dequeue is full. Allocate a new one of twice
// the size.
newSize := len(d.vals) * 2
if newSize >= dequeueLimit {
// Can't make it any bigger.
newSize = dequeueLimit
}
d2 := &poolChainElt{prev: d}
d2.vals = make([]eface, newSize)
c.head = d2
storePoolChainElt(&d.next, d2)
d2.pushHead(val)
}
pushHead
poolChainEltpoolChainEltpoolDequeuepoolDequeuepushHeadpoolDequeue
这里需要注意一点的是head其实是指向最后chain中最后一个结点(poolDequeue),chain执行push操作是往最后一个节点push。 所以这里的head的语义不是针对链表结构,而是针对队列结构。
poolChain.popTail()
func (c *poolChain) popTail() (interface{}, bool) {
d := loadPoolChainElt(&c.tail)
if d == nil {
return nil, false
}
for {
// It's important that we load the next pointer
// *before* popping the tail. In general, d may be
// transiently empty, but if next is non-nil before
// the pop and the pop fails, then d is permanently
// empty, which is the only condition under which it's
// safe to drop d from the chain.
d2 := loadPoolChainElt(&d.next)
if val, ok := d.popTail(); ok {
return val, ok
}
if d2 == nil {
// This is the only dequeue. It's empty right
// now, but could be pushed to in the future.
return nil, false
}
// The tail of the chain has been drained, so move on
// to the next dequeue. Try to drop it from the chain
// so the next pop doesn't have to look at the empty
// dequeue again.
if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&c.tail)), unsafe.Pointer(d), unsafe.Pointer(d2)) {
// We won the race. Clear the prev pointer so
// the garbage collector can collect the empty
// dequeue and so popHead doesn't back up
// further than necessary.
storePoolChainElt(&d2.prev, nil)
}
d = d2
}
}
popTail
popTail
poolDequeue
上面的流程是典型的的无锁并发编程。
poolDequeue
poolDequeue
poolDequeue
数据结构
type poolDequeue struct {
// 用高32位和低32位分别表示head和tail
// head是下一个fill的slot的index;
// tail是deque中最老的一个元素的index
// 队列中有效元素是[tail, head)
headTail uint64
vals []eface
}
type eface struct {
typ, val unsafe.Pointer
}
headTailheadTail
- 高32位表示head;
- 低32位表示tail。
- head和tail自加溢出时是安全的。
valsring buffer
pack/unpack
headTail
const dequeueBits = 32
func (d *poolDequeue) unpack(ptrs uint64) (head, tail uint32) {
const mask = 1<<dequeueBits - 1
head = uint32((ptrs >> dequeueBits) & mask)
tail = uint32(ptrs & mask)
return
}
func (d *poolDequeue) pack(head, tail uint32) uint64 {
const mask = 1<<dequeueBits - 1
return (uint64(head) << dequeueBits) |
uint64(tail&mask)
}
pack:
- 首先拿到mask,这里实际上就是 0xffffffff(2^32-1)
- head左移32位 | tail&0xffffffff 就可以得到head和tail pack之后的值。
unpack:
- 首先拿到mask,这里实际上就是 0xffffffff(2^32-1)
- ptrs右移32位拿到高32位然后 & mask 就可以得到head;
- ptrs直接 & mask 就可以得到低32位,也就是tail。
poolDequeue.pushHead
pushHead
func (d *poolDequeue) pushHead(val interface{}) bool {
ptrs := atomic.LoadUint64(&d.headTail)
head, tail := d.unpack(ptrs)
if (tail+uint32(len(d.vals)))&(1<<dequeueBits-1) == head {
// Queue is full.
return false
}
slot := &d.vals[head&uint32(len(d.vals)-1)]
// Check if the head slot has been released by popTail.
typ := atomic.LoadPointer(&slot.typ)
if typ != nil {
// Another goroutine is still cleaning up the tail, so
// the queue is actually still full.
return false
}
// The head slot is free, so we own it.
if val == nil {
val = dequeueNil(nil)
}
*(*interface{})(unsafe.Pointer(slot)) = val
// Increment head. This passes ownership of slot to popTail
// and acts as a store barrier for writing the slot.
atomic.AddUint64(&d.headTail, 1<<dequeueBits)
return true
}
主要逻辑:
- 原子load head和tail,
- 如果tail + len(vals) == head 说明deque已经满了。
- 拿到head在vals中index的slot
- 如果slot的type非空,说明该slot还没有被popTail release,实际上deque还是满的;所以直接return false;
- 更新val到slot的指针指向的值。
- 原子的自加head
需要注意的是,pushHead不是并发安全的,只能有一个Producer去执行;只有slot的的type指针为空时候slot才是空。
poolDequeue.popHead
popHead
func (d *poolDequeue) popHead() (interface{}, bool) {
var slot *eface
for {
ptrs := atomic.LoadUint64(&d.headTail)
head, tail := d.unpack(ptrs)
if tail == head {
// Queue is empty.
return nil, false
}
// Confirm tail and decrement head. We do this before
// reading the value to take back ownership of this
// slot.
head--
ptrs2 := d.pack(head, tail)
if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) {
// We successfully took back slot.
slot = &d.vals[head&uint32(len(d.vals)-1)]
break
}
}
val := *(*interface{})(unsafe.Pointer(slot))
if val == dequeueNil(nil) {
val = nil
}
// Zero the slot. Unlike popTail, this isn't racing with
// pushHead, so we don't need to be careful here.
*slot = eface{}
return val, true
}
主要逻辑:
poolDequeue.headTailpoolDequeue.headTailatomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2)
poolDequeue.popTail
这个函数是可以被Consumer并发访问的。
func (d *poolDequeue) popTail() (interface{}, bool) {
var slot *eface
for {
ptrs := atomic.LoadUint64(&d.headTail)
head, tail := d.unpack(ptrs)
if tail == head {
// Queue is empty.
return nil, false
}
// Confirm head and tail (for our speculative check
// above) and increment tail. If this succeeds, then
// we own the slot at tail.
ptrs2 := d.pack(head, tail+1)
if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) {
// Success.
slot = &d.vals[tail&uint32(len(d.vals)-1)]
break
}
}
// We now own slot.
val := *(*interface{})(unsafe.Pointer(slot))
if val == dequeueNil(nil) {
val = nil
}
// Tell pushHead that we're done with this slot. Zeroing the
// slot is also important so we don't leave behind references
// that could keep this object live longer than necessary.
//
// We write to val first and then publish that we're done with
// this slot by atomically writing to typ.
slot.val = nil
atomic.StorePointer(&slot.typ, nil)
// At this point pushHead owns the slot.
return val, true
}
主要逻辑:
atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2)
数据结构总结
用一张图完整描述sync.Pool的数据结构:
强调一点:
- head的操作只能是local P;
- tail的操作是任意P;
参考网上一张图来看更加清晰:
Pool 并没有直接使用 poolDequeue,因为它是fixed size的,而 Pool 的大小是没有限制的。因此,在 poolDequeue 之上包装了一下,变成了一个 poolChainElt 的双向链表,可以动态增长。
func (p *Pool) Put(x interface{}) {
if x == nil {
return
}
l, _ := p.pin()
if l.private == nil {
l.private = x
x = nil
}
if x != nil {
l.shared.pushHead(x)
}
runtime_procUnpin()
}
func (p *Pool) pin() (*poolLocal, int) {
pid := runtime_procPin()
// In pinSlow we store to local and then to localSize, here we load in opposite order.
// Since we've disabled preemption, GC cannot happen in between.
// Thus here we must observe local at least as large localSize.
// We can observe a newer/larger local, it is fine (we must observe its zero-initialized-ness).
s := atomic.LoadUintptr(&p.localSize) // load-acquire
l := p.local // load-consume
if uintptr(pid) < s {
return indexLocal(l, pid), pid
}
return p.pinSlow()
}
func (p *Pool) pinSlow() (*poolLocal, int) {
// Retry under the mutex.
// Can not lock the mutex while pinned.
runtime_procUnpin()
allPoolsMu.Lock()
defer allPoolsMu.Unlock()
pid := runtime_procPin()
// poolCleanup won't be called while we are pinned.
s := p.localSize
l := p.local
if uintptr(pid) < s {
return indexLocal(l, pid), pid
}
if p.local == nil {
allPools = append(allPools, p)
}
// If GOMAXPROCS changes between GCs, we re-allocate the array and lose the old one.
size := runtime.GOMAXPROCS(0)
local := make([]poolLocal, size)
atomic.StorePointer(&p.local, unsafe.Pointer(&local[0])) // store-release
atomic.StoreUintptr(&p.localSize, uintptr(size)) // store-release
return &local[pid], pid
}
Put函数主要逻辑:
p.pin()p.pin()pinSlow()allPoolsMushared.pushHead(x)
Pool.Get
func (p *Pool) Get() interface{} {
l, pid := p.pin()
x := l.private
l.private = nil
if x == nil {
// Try to pop the head of the local shard. We prefer
// the head over the tail for temporal locality of
// reuse.
x, _ = l.shared.popHead()
if x == nil {
x = p.getSlow(pid)
}
}
runtime_procUnpin()
if x == nil && p.New != nil {
x = p.New()
}
return x
}
func (p *Pool) getSlow(pid int) interface{} {
// See the comment in pin regarding ordering of the loads.
size := atomic.LoadUintptr(&p.localSize) // load-acquire
locals := p.local // load-consume
// Try to steal one element from other procs.
for i := 0; i < int(size); i++ {
l := indexLocal(locals, (pid+i+1)%int(size))
if x, _ := l.shared.popTail(); x != nil {
return x
}
}
// Try the victim cache. We do this after attempting to steal
// from all primary caches because we want objects in the
// victim cache to age out if at all possible.
size = atomic.LoadUintptr(&p.victimSize)
if uintptr(pid) >= size {
return nil
}
locals = p.victim
l := indexLocal(locals, pid)
if x := l.private; x != nil {
l.private = nil
return x
}
for i := 0; i < int(size); i++ {
l := indexLocal(locals, (pid+i)%int(size))
if x, _ := l.shared.popTail(); x != nil {
return x
}
}
// Mark the victim cache as empty for future gets don't bother
// with it.
atomic.StoreUintptr(&p.victimSize, 0)
return nil
}
Get函数主要逻辑:
getSlow(pid)
这里是一个很明显的多层级缓存优化 + GPM调度结合起来。
private -> shared -> steal from other P -> victim cache -> New
victim cache优化与GC
对于Pool来说并不能够无上限的扩展,否则对象占用内存太多了,会引起内存溢出。
几乎所有的池技术中,都会在某个时刻清空或清除部分缓存对象,那么在 Go 中何时清理未使用的对象呢?
init函数
func init() {
runtime_registerPoolCleanup(poolCleanup)
}
// mgc.go
//go:linkname sync_runtime_registerPoolCleanup sync.runtime_registerPoolCleanup
func sync_runtime_registerPoolCleanup(f func()) {
poolcleanup = f
}
runtime_registerPoolCleanupsync_runtime_registerPoolCleanup
gcStart() -> clearpools() -> poolcleanup()
也就是每一轮GC开始都会执行pool的清除操作。
func poolCleanup() {
// This function is called with the world stopped, at the beginning of a garbage collection.
// It must not allocate and probably should not call any runtime functions.
// Because the world is stopped, no pool user can be in a
// pinned section (in effect, this has all Ps pinned).
// Drop victim caches from all pools.
for _, p := range oldPools {
p.victim = nil
p.victimSize = 0
}
// Move primary cache to victim cache.
for _, p := range allPools {
p.victim = p.local
p.victimSize = p.localSize
p.local = nil
p.localSize = 0
}
// The pools with non-empty primary caches now have non-empty
// victim caches and no pools have primary caches.
oldPools, allPools = allPools, nil
}
poolCleanup 会在 STW 阶段被调用。整体看起来,比较简洁。主要是将 local 和 victim 作交换,这样也就不致于让 GC 把所有的 Pool 都清空了,有 victim 在“兜底”。
对象可能会在两个 GC 周期内被释放,而不是以前的一个 GC 周期。
在Go1.13之前的poolCleanup比较粗暴,直接清空了所有 Pool 的 p.local 和poolLocal.shared。
通过两者的对比发现,新版的实现相比 Go 1.13 之前,GC 的粒度拉大了,由于实际回收的时间线拉长,单位时间内 GC 的开销减小。
所以 p.victim 的作用其实就是次级缓存。
sync.Pool 总结- 关键思想是对象的复用,避免重复创建、销毁。将暂时不用的对象缓存起来,待下次需要的时候直接使用,不用再次经过内存分配,复用对象的内存,减轻 GC 的压力。
- sync.Pool 是协程安全的,使用起来非常方便。设置好 New 函数后,调用 Get 获取,调用 Put 归还对象。
- 不要对 Get 得到的对象有任何假设,默认Get到对象是一个空对象,Get之后手动初始化。
- 好的实践是:Put操作执行前将对象“清空”,并且确保对象被Put进去之后不要有任何的指针引用再次使用,不然极大概率导致data race。
- 第三和第四也就是考虑清楚复用对象的生命周期
- Pool 里对象的生命周期受 GC 影响,不适合于做连接池,因为连接池需要自己管理对象的生命周期。
- Pool 不可以指定⼤⼩,⼤⼩只受制于 GC 临界值。
- procPin 将 G 和 P 绑定,防止 G 被抢占。在绑定期间,GC 无法清理缓存的对象。
- 在加入 victim 机制前,sync.Pool 里对象的最⼤缓存时间是一个 GC 周期,当 GC 开始时,没有被引⽤的对象都会被清理掉;加入 victim 机制后,最大缓存时间为两个 GC 周期。
- Victim Cache 本来是计算机架构里面的一个概念,是 CPU 硬件处理缓存的一种技术,sync.Pool 引入的意图在于降低 GC 压力的同时提高命中率。
- sync.Pool 的最底层使用切片加链表来实现双端队列,并将缓存的对象存储在切片中。
- sync.Pool 的设计理念,包括:无锁、操作对象隔离、原子操作代替锁、行为隔离——链表、Victim Cache 降低 GC 开销。
参考文档:
理解Go 1.13中sync.Pool的设计与实现
golang新版如何优化sync.pool锁竞争消耗?
深度解密 Go 语言之 sync.Pool