我们知道,早期的sync.Pool,底层是通过互斥锁实现对共享队列的并发访问的,这里会存在的问题是,尽管是分段锁,高并发场景下频繁进行G的wait和runable状态调度其实开销也不算小
通过互斥锁保证并发安全的sync.Pool数据结构
type Pool struct {
noCopy noCopy
local unsafe.Pointer // local,固定大小per-P池, 实际类型为 [P]poolLocal
localSize uintptr // local array 的大小
// New 方法在 Get 失败的情况下,选择性的创建一个值, 否则返回nil
New func() interface{}
}
type poolLocal struct {
poolLocalInternal
// 将 poolLocal 补齐至两个缓存行的倍数,防止 false sharing,
// 每个缓存行具有 64 bytes,即 512 bit
// 目前我们的处理器一般拥有 32 * 1024 / 64 = 512 条缓存行
pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte
}
// Local per-P Pool appendix.
type poolLocalInternal struct {
private interface{} // 只能被局部调度器P使用
shared []interface{} // 所有P共享
Mutex // 读写shared前要加锁
}
通过双向链表实现的无锁sync.Pool数据结构
local和victim都指向poolLocal数组,它们的区别是: pool里面的两个poolLocal数组,每次经历STW后victim置零,将local赋值给victim,local置零。
这里的allPools和oldPools都存储的Pool集合。allPools存储的是未经历STW的Pool,使用的是pool的local字段;allPools经历STW之后将local变成victim,allPools变成oldPools。这两个变量不参与存取过程,仅在STW的时候使用。
poolCleanup
func init() {
runtime_registerPoolCleanup(poolCleanup)
}
sync.Pool在init()中向runtime注册了一个cleanup方法,它在STW1阶段被调用的
也就是说,pool中缓存的对象,最多生存2次GC就会完全回收
数据结构
我们看一下这里put和get的方法
func (p *Pool) Put(x interface{}) {
if x == nil {
return
}
// . . .
l, _ := p.pin() // 拿到g所属p的poolLocal, 并锁定
// 这里取消了 mutex 调用.
if l.private == nil {
l.private = x
x = nil
}
if x != nil {
// 归还对象
l.shared.pushHead(x)
}
runtime_procUnpin()
if race.Enabled {
race.Enable()
}
}
func (p *Pool) Get() interface{} {
if race.Enabled {
race.Disable()
}
l, pid := p.pin()
x := l.private
l.private = nil
if x == nil {
x, _ = l.shared.popHead() // 从本地的shared拿对象
if x == nil {
x = p.getSlow(pid) // 尝试偷对象
}
}
runtime_procUnpin()
if x == nil && p.New != nil {
x = p.New() // 没偷到对象,新建一个
}
return x
}
func (p *Pool) getSlow(pid int) interface{} {
size := atomic.LoadUintptr(&p.localSize) // load-acquire
locals := p.local // load-consume
// Try to steal one element from other procs.
for i := 0; i < int(size); i++ {
l := indexLocal(locals, (pid+i+1)%int(size))
if x, _ := l.shared.popTail(); x != nil {
return x
}
}
. . .
}
// 从链表尾取对象,这里是一个并发操作
func (c *poolChain) popTail() (interface{}, bool) {
d := loadPoolChainElt(&c.tail)
if d == nil {
return nil, false
}
for {
d2 := loadPoolChainElt(&d.next)
if val, ok := d.popTail(); ok {
return val, ok
}
if d2 == nil {
return nil, false
}
// 使用atomic cas来替换tail
if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&c.tail)), unsafe.Pointer(d), unsafe.Pointer(d2)) {
storePoolChainElt(&d2.prev, nil)
}
d = d2
}
}
为什么这里用cas替换了Mutex
大家知道cas的机制是自旋重试,它适用的场景是锁粒度小、释放快的场景,这样cas快速重试几次就能成功
而互斥锁会导致协程gopark\进入waitqueue,释放锁后又被goready叫醒再被调度
对于pool的场景下,本身是锁粒度小、释放快的场景,相比互斥锁,cas大概率原地重试几下就能拿到锁,相比切换协程状态重新调度,性能开销就会低一些。