目录

1. 什么是sync.Pool?

Go 1.3 的sync包中加入一个新特性:Pool,官方文档。

简单的说:它就是一个临时对象池,这个类设计的目的是用来保存和复用临时对象,以减少内存分配,降低CG压力。

2. 为什么需要sync.Pool?

增加临时对象的重用率,减少内存分配,减少GC负担,goroutine对象越多GC越慢,因为Golang进行三色标记回收的时候,要标记的也越多,自然就慢。

3. sync.Pool使用

思路:搞一个池子,预先放入临时产生的对象,然后取出使用

官方fmt包就是使用了sync.pool,由于fmt总是需要很多[]byte对象,索性就直接建了一个[]byte对象的池子。

package main

import (
	"fmt"
	"sync"
)

func main() {
	// 初始化一个pool
	pool := &sync.Pool{
		// 默认的返回值设置,不写这个参数,默认是nil
		New: func() interface{} {
			return 0
		},
	}

	// 看一下初始的值,这里是返回0,如果不设置New函数,默认返回nil
	init := pool.Get()
	fmt.Println(init)

	// 设置一个参数1
	pool.Put(1)

	// 获取查看结果
	num := pool.Get()
	fmt.Println(num)

	// 再次获取,会发现,已经是空的了,只能返回默认的值。
	num = pool.Get()
	fmt.Println(num)
}

4. sync.Pool源码分析

1) Pool结构分析

type Pool struct {
	// noCopy,防止当前类型被copy,是一个有意思的字段,后文详说。
	noCopy noCopy

    // [P]poolLocal 数组指针
	local     unsafe.Pointer
	// 数组大小
	localSize uintptr        

	// 选填的自定义函数,缓冲池无数据的时候会调用,不设置默认返回nil
	New func() interface{} //新建对象函数
}

type poolLocalInternal struct {
    // 私有缓存区
	private interface{}   
	// 公共缓存区
	shared  []interface{} 
	// 锁
	Mutex               
}

type poolLocal struct {
	// 每个P对应的pool
	poolLocalInternal

	// 这个字段很有意思,是为了防止“false sharing/伪共享”,后文详讲。
	pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte
}

2) 基础函数pin,目的:确定当前P(调度器)绑定的localPool对象

流程:禁止抢占GC =》寻找偏移量=》检查越界 =》返回poolLocal=》加锁重建pool,并添加到allPool

func (p *Pool) pin() *poolLocal {
	// 返回当前 P.id && 设置禁止抢占(避免GC)
	pid := runtime_procPin()
	
	// 根据locaSize来获取当前指针偏移的位置
	s := atomic.LoadUintptr(&p.localSize) 
	l := p.local         
	
	// 有可能在运行中动调调整P,所以这里进行需要判断是否越界
	if uintptr(pid) < s {
	    // 没越界,直接返回
		return indexLocal(l, pid)
	}
	
    // 越界时,会涉及全局加锁,重新分配poolLocal,添加到全局列表
	return p.pinSlow()
}

var (
	allPoolsMu Mutex
	allPools   []*Pool
)


func (p *Pool) pinSlow() *poolLocal {
	// 取消P的禁止抢占(因为后面要进行metux加锁)
	runtime_procUnpin()
	
	// 加锁
	allPoolsMu.Lock()
	defer allPoolsMu.Unlock()
	
	// 返回当前 P.id && 设置禁止抢占(避免GC)
	pid := runtime_procPin()
	
	// 再次检查是否符合条件,有可能中途已被其他线程调用
	s := p.localSize
	l := p.local
	if uintptr(pid) < s {
		return indexLocal(l, pid)
	}
	
	// 如果数组为空,则新建Pool,将其添加到 allPools,GC以此获取所有 Pool 实例
	if p.local == nil {
		allPools = append(allPools, p)
	}
	
    // 根据 P 数量创建 slice
	size := runtime.GOMAXPROCS(0)
	local := make([]poolLocal, size)
	
	 // 将底层数组起始指针保存到 Pool.local,并设置 P.localSize
	 // 这里需要关注的是:如果GOMAXPROCS在GC间发生变化,则会重新分配的时候,直接丢弃老的,等待GC回收。
	atomic.StorePointer(&p.local, unsafe.Pointer(&local[0]))
	atomic.StoreUintptr(&p.localSize, uintptr(size))         
	
	// 返回本次所需的 poolLocal
	return &local[pid]
}

// 根据数据结构的大小来计算指针的偏移量
func indexLocal(l unsafe.Pointer, i int) *poolLocal {
	lp := unsafe.Pointer(uintptr(l) + uintptr(i)*unsafe.Sizeof(poolLocal{}))
	return (*poolLocal)(lp)
}

3) put (优先放入private空间,后面再放入shared空间)

func (p *Pool) Put(x interface{}) {
	if x == nil {
		return
	}
    
    // 这段代码,不需要关心,降低竞争的
	if race.Enabled {
		if fastrand()%4 == 0 {
			// Randomly drop x on floor.
			return
		}
		race.ReleaseMerge(poolRaceAddr(x))
		race.Disable()
	}

    // 获取当前的poolLocal
	l := p.pin()

    // 如果private为nil,则优先进行设置,并标记x
	if l.private == nil {
		l.private = x
		x = nil
	}
	runtime_procUnpin()

    // 如果标记x不为nil,则将x设置到shared中
	if x != nil {
		l.Lock()
		l.shared = append(l.shared, x)
		l.Unlock()
	}
    
    // 设置竞争可用了。
	if race.Enabled {
		race.Enable()
	}
}

4) get

优先从private空间拿,不存在再继续加锁从shared空间拿,还没有再从其他的PoolLocal的shared空间拿,还没有就直接new一个返回。

func (p *Pool) Get() interface{} {
    // 竞争相关的设置
	if race.Enabled {
		race.Disable()
	}
    
    // 获取当前的poolLocal
	l := p.pin()

    // 从private中获取
	x := l.private
	l.private = nil
	runtime_procUnpin()

    // 不存在,则继续从shared空间拿,
	if x == nil {
	    // 加锁了,防止并发 
		l.Lock()
		last := len(l.shared) - 1
		if last >= 0 {
			x = l.shared[last]
            // 从尾巴开始拿起
			l.shared = l.shared[:last]
		}
		l.Unlock()
		if x == nil {
		    // 从其他的poolLocal中的shared空间看看有没有可返回的。
			x = p.getSlow()
		}
	}
    
    // 竞争解除
	if race.Enabled {
		race.Enable()
		if x != nil {
			race.Acquire(poolRaceAddr(x))
		}
	}
    
    // 如果还是没有的话,就直接new一个了
	if x == nil && p.New != nil {
		x = p.New()
	}
	return x
}

func (p *Pool) getSlow() (x interface{}) {
    // 获取poolLocal数组的大小
	size := atomic.LoadUintptr(&p.localSize) // load-acquire
	local := p.local                         // load-consume
	
	// 尝试从其他procs获取一个P对象
	pid := runtime_procPin()
	runtime_procUnpin()
	
	for i := 0; i < int(size); i++ {
        // 获取一个poolLocal,注意这里是从当前的local的位置开始获取的,目的是防止取到自身
		l := indexLocal(local, (pid+i+1)%int(size))
		// 加锁从尾部获取shared的数据
		l.Lock()
		last := len(l.shared) - 1
        // 若长度大于1
		if last >= 0 {
			x = l.shared[last]
			l.shared = l.shared[:last]
			l.Unlock()
			break
		}
		l.Unlock()
	}
	return x
}

5. QA

1) pool的是永久保存的吗?

会进行清理的,时间就是两次GC间隔的时间。 sync.Pool不适合放做“数据库连接池”等带持久性质的数据,因为它会定期回收。

2) 为什么获取shared要加锁,而private不用?

golang是MPG的方式运行的,每个P都分配一个localPool,在同一个P下面只会有一个Gouroutine在跑,所以这里的private,在同一时间就只可能被一个Gouroutine获取到。而shared就不一样了,有可能被其他的P给获取走,在同一时间就只可能被多个Gouroutine获取到,为了保证数据竞争,必须加一个锁来保证只会被一个G拿走。

3) noCopy的作用?

防止Pool被拷贝,因为Pool 在Golang是全局唯一。如何实现被防止拷贝:只要包含实现 sync.Locker 这个接口的结构体noCopy,go vet 就可以帮我们进行检查是否被拷贝。

4) pad的作用?

主要就是用来防止“伪共享”的。

5) 如何保证数据存储在LocalPool数组对应的单元?

根据数据结构的大小来计算指针的偏移量,进而算出是LocalPool数组的哪个。