什么是池?

image

由于对池的概念很不清晰,引发了自己对 golang 中 sync.pool 的探究理解和学习

sync.pool

先看一个实例:


package main

import (
	"runtime/debug"
	"sync/atomic"
	"sync"
	"fmt"
	"runtime"
)

func main() {
	defer debug.SetGCPercent(debug.SetGCPercent(-1))

	var count int32
	newfun := func() interface{} {
		return atomic.AddInt32(&count, 1)
	}

	pool := sync.Pool{New: newfun}

	v1 := pool.Get()
	fmt.Printf("v1 :%v\n", v1)

	pool.Put(9)
	pool.Put(10)
	pool.Put(11)
	pool.Put(12)

	v2 := pool.Get()
	fmt.Printf("v2 :%v\n", v2)

	debug.SetGCPercent(100)
	runtime.GC()

	v3 := pool.Get()
	fmt.Printf("v3 :%v\n", v3)

	pool.New = nil

	v4 := pool.Get()
	fmt.Printf("v4 :%v\n", v4)
}


解决的问题

设计对象缓存池,除避免内存分配操作开销外,更多的是为了避免分配大量临时对象对垃圾回收器造成负面影响。

与调度之间的一些联系
    m-p-g(m:n:n)
        
    m------p----- poolLocal    
     *   * |        
      * *  g - g
       *       |
      * *      g
     *   *    ...
    m------P----- poolLocal  
           |
           |
           g---g
               |
               g
              ...

(关于调度和 gc 模块还没能深入综合理解及其联系,以上仅供参考)

pool 的两个特点

1、在本地私有池和本地共享池均获取失败则会从其他 p 偷一个返回给调用方

2、对象在池中的生命周期取决于垃圾回收任务的下一次执行时间并且从池中获取到的值可能是 put 进去的其中一个值也可能是 newfun 新生成的一个值,在应用时很容易入坑

包中具体的实现函数和结构体

Pool 用 local 和 localSize 维护一个动态 poolLocal 数组。


type Pool struct {
	noCopy noCopy

	local     unsafe.Pointer //[P]poolLocal 数组指针 
	localSize uintptr        // 数组大小

	
	New func() interface{} //新建对象函数
}

type poolLocalInternal struct {
	private interface{}   // 私有缓存区
	shared  []interface{} // 公共缓存区
	Mutex                 // 
}

type poolLocal struct {
	poolLocalInternal

	// Prevents false sharing on widespread platforms with
	// 128 mod (cache line size) = 0 .
	pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte
}




无论是 Get(),还是 Put()操作都会通过 pin 来返回与当前 P 绑定的 poolLocal 对象,这里面就有初始化的关键


func (p *Pool) pin() *poolLocal {
	// 返回当前 P.id
	pid := runtime_procPin()

	
	s := atomic.LoadUintptr(&p.localSize) // load-acquire
	l := p.local         // load-consume
	 // 如果 P.id 没有超出数组索引限制,则直接返回
     // 这是考虑到 procresize/GOMAXPROCS 的影响
	if uintptr(pid) < s {
		return indexLocal(l, pid)
	}
    // 没有结果时,会涉及全局加锁操作
    // 比如重新分配数组内存,添加到全局列表
	return p.pinSlow()
}


func (p *Pool) pinSlow() *poolLocal {
	// M.lock--
	runtime_procUnpin()
	// 加锁
	allPoolsMu.Lock()
	defer allPoolsMu.Unlock()
	pid := runtime_procPin()
	// 再次检查是否符合条件,可能中途已被其他线程调用
	s := p.localSize
	l := p.local
	if uintptr(pid) < s {
		return indexLocal(l, pid)
	}
	// 如果数组为空,新建
    // 将其添加到 allPools,垃圾回收器以此获取所有 Pool 实例
	if p.local == nil {
		allPools = append(allPools, p)
	}
    // 根据 P 数量创建 slice
	size := runtime.GOMAXPROCS(0)
	local := make([]poolLocal, size)
	 // 将底层数组起始指针保存到 Pool.local,并设置 P.localSize
	atomic.StorePointer(&p.local, unsafe.Pointer(&local[0]))
	atomic.StoreUintptr(&p.localSize, uintptr(size))         
	// 返回本次所需的 poolLocal
	return &local[pid]
}

func indexLocal(l unsafe.Pointer, i int) *poolLocal {
	lp := unsafe.Pointer(uintptr(l) + uintptr(i)*unsafe.Sizeof(poolLocal{}))
	return (*poolLocal)(lp)
}


1、Get()获取对象时 优先从 private 空间获取 -> 没有则加锁从 share 空间获取(从尾部开始获取)-> 没有再 new func 新的对象(此对象不会放回池中)

2、注意:Get 操作后(在返回之前就会将它从池中删除),缓存对象彻底与 Pool 失去引用关联,需要自行 Put 放回。


func (p *Pool) Get() interface{} {
	if race.Enabled {
		race.Disable()
	}
	l := p.pin()
	x := l.private
	l.private = nil
	runtime_procUnpin()
	if x == nil {
		l.Lock()
		last := len(l.shared) - 1
		if last >= 0 {
			x = l.shared[last]
			l.shared = l.shared[:last]
		}
		l.Unlock()
		if x == nil {
			x = p.getSlow()
		}
	}
	if race.Enabled {
		race.Enable()
		if x != nil {
			race.Acquire(poolRaceAddr(x))
		}
	}
	if x == nil && p.New != nil {
		x = p.New()
	}
	return x
}

func (p *Pool) getSlow() (x interface{}) {
	
	size := atomic.LoadUintptr(&p.localSize) // load-acquire
	local := p.local                         // load-consume
	// 尝试从其他procs获取一个对象
	pid := runtime_procPin()
	runtime_procUnpin()
	for i := 0; i < int(size); i++ {
		l := indexLocal(local, (pid+i+1)%int(size))
		l.Lock()
		last := len(l.shared) - 1
		if last >= 0 {
			x = l.shared[last]
			l.shared = l.shared[:last]
			l.Unlock()
			break
		}
		l.Unlock()
	}
	return x
}


Put()优先放入 private 空间-> 其次再考虑 share 空间


func (p *Pool) Put(x interface{}) {
	if x == nil {
		return
	}
	if race.Enabled {
		if fastrand()%4 == 0 {
			// Randomly drop x on floor.
			return
		}
		race.ReleaseMerge(poolRaceAddr(x))
		race.Disable()
	}
	l := p.pin()
	if l.private == nil {
		l.private = x
		x = nil
	}
	runtime_procUnpin()
	if x != nil {
		l.Lock()
		l.shared = append(l.shared, x)
		l.Unlock()
	}
	if race.Enabled {
		race.Enable()
	}
}


由 poolCleanup()可知其操作很简单粗暴清空,并且其需要额外注册 runtime_registerPoolCleanup()的



func poolCleanup() {
	// This function is called with the world stopped, at the beginning of a garbage collection.
	// It must not allocate and probably should not call any runtime functions.
	// Defensively zero out everything, 2 reasons:
	// 1. To prevent false retention of whole Pools.
	// 2. If GC happens while a goroutine works with l.shared in Put/Get,
	//    it will retain whole Pool. So next cycle memory consumption would be doubled.
	for i, p := range allPools {
		allPools[i] = nil
		for i := 0; i < int(p.localSize); i++ {
			l := indexLocal(p.local, i)
			l.private = nil
			for j := range l.shared {
				l.shared[j] = nil
			}
			l.shared = nil
		}
		p.local = nil
		p.localSize = 0
	}
	allPools = []*Pool{}
}

var (
	allPoolsMu Mutex
	allPools   []*Pool
)

func init() {
	runtime_registerPoolCleanup(poolCleanup)
}

func indexLocal(l unsafe.Pointer, i int) *poolLocal {
	lp := unsafe.Pointer(uintptr(l) + uintptr(i)*unsafe.Sizeof(poolLocal{}))
	return (*poolLocal)(lp)
}

// Implemented in runtime.
func runtime_registerPoolCleanup(cleanup func())
func runtime_procPin() int
func runtime_procUnpin()



在 1.5 版本中的一点点区别

image
image

以上总结:sync.Pool 的定位不是做类似连接池的东西,它的用途仅仅是增加对象重用的几率,减少 gc 的负担,在开销方面也不是很低,在调度方面和 gc 方面还需要串着多看,多理解其原理才行。
参考资料:

《Go 并发编程第二版-郝林》

《Go 语言学习笔记-雨痕》