我们知道,早期的sync.Pool,底层是通过互斥锁实现对共享队列的并发访问的,这里会存在的问题是,尽管是分段锁,高并发场景下频繁进行G的wait和runable状态调度其实开销也不算小

 通过互斥锁保证并发安全的sync.Pool数据结构

type Pool struct {
	noCopy noCopy

	local     unsafe.Pointer // local,固定大小per-P池, 实际类型为 [P]poolLocal
	localSize uintptr        // local array 的大小
	//  New 方法在 Get 失败的情况下,选择性的创建一个值, 否则返回nil
	New func() interface{}
}

type poolLocal struct {
	poolLocalInternal

	// 将 poolLocal 补齐至两个缓存行的倍数,防止 false sharing,
	// 每个缓存行具有 64 bytes,即 512 bit
	// 目前我们的处理器一般拥有 32 * 1024 / 64 = 512 条缓存行
	pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte
}

// Local per-P Pool appendix.
type poolLocalInternal struct {
	private interface{}   // 只能被局部调度器P使用
	shared  []interface{} // 所有P共享
	Mutex                 // 读写shared前要加锁
}

通过双向链表实现的无锁sync.Pool数据结构

local和victim都指向poolLocal数组,它们的区别是: pool里面的两个poolLocal数组,每次经历STW后victim置零,将local赋值给victim,local置零。

这里的allPools和oldPools都存储的Pool集合。allPools存储的是未经历STW的Pool,使用的是pool的local字段;allPools经历STW之后将local变成victim,allPools变成oldPools。这两个变量不参与存取过程,仅在STW的时候使用。

poolCleanup

func init() {

runtime_registerPoolCleanup(poolCleanup)

}

sync.Pool在init()中向runtime注册了一个cleanup方法,它在STW1阶段被调用的

也就是说,pool中缓存的对象,最多生存2次GC就会完全回收

数据结构

我们看一下这里put和get的方法

func (p *Pool) Put(x interface{}) {
	if x == nil {
		return
	}
    // . . .
	l, _ := p.pin()  // 拿到g所属p的poolLocal, 并锁定

    // 这里取消了 mutex 调用.
	if l.private == nil {
		l.private = x
		x = nil
	}
	if x != nil {
        // 归还对象
		l.shared.pushHead(x)
	}
	runtime_procUnpin()
	if race.Enabled {
		race.Enable()
	}
}

func (p *Pool) Get() interface{} {
	if race.Enabled {
		race.Disable()
	}
	l, pid := p.pin()
	x := l.private
	l.private = nil
	if x == nil {
		x, _ = l.shared.popHead()  // 从本地的shared拿对象
		if x == nil {
			x = p.getSlow(pid) // 尝试偷对象
		}
	}
	runtime_procUnpin()
	if x == nil && p.New != nil {
		x = p.New() // 没偷到对象,新建一个
	}
	return x
}

func (p *Pool) getSlow(pid int) interface{} {
	size := atomic.LoadUintptr(&p.localSize) // load-acquire
	locals := p.local                        // load-consume
	// Try to steal one element from other procs.
	for i := 0; i < int(size); i++ {
		l := indexLocal(locals, (pid+i+1)%int(size))
		if x, _ := l.shared.popTail(); x != nil {
			return x
		}
	}
        . . .
}

// 从链表尾取对象,这里是一个并发操作
func (c *poolChain) popTail() (interface{}, bool) {
	d := loadPoolChainElt(&c.tail)
	if d == nil {
		return nil, false
	}
	for {
		d2 := loadPoolChainElt(&d.next)
		if val, ok := d.popTail(); ok {
			return val, ok
		}

		if d2 == nil {
			return nil, false
		}

        // 使用atomic cas来替换tail
		if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&c.tail)), unsafe.Pointer(d), unsafe.Pointer(d2)) {
			storePoolChainElt(&d2.prev, nil)
		}
		d = d2
	}
}

为什么这里用cas替换了Mutex

大家知道cas的机制是自旋重试,它适用的场景是锁粒度小、释放快的场景,这样cas快速重试几次就能成功

而互斥锁会导致协程gopark\进入waitqueue,释放锁后又被goready叫醒再被调度

对于pool的场景下,本身是锁粒度小、释放快的场景,相比互斥锁,cas大概率原地重试几下就能拿到锁,相比切换协程状态重新调度,性能开销就会低一些。