golang中goroutine由运行时管理,使用go关键字就可以方便快捷的创建一个goroutine,受限于服务器硬件内存大小,如果不对goroutine数量进行限制,会出现Out of Memory错误。但是goroutine泄漏引发的血案,想必各位gopher都经历过,通过协程池限制goroutine数一个有效避免泄漏的手段,但是自己手动实现一个协程池,总是会兼顾不到各种场景,比如释放,处理panic,动态扩容等。那么ants是公认的优秀实现协程池。
ants简介
ants是一个高性能的 goroutine 池,实现了对大规模 goroutine 的调度管理、goroutine 复用,允许使用者在开发并发程序的时候限制 goroutine 数量,复用资源,达到更高效执行任务的效果
功能
- 自动调度海量的 goroutines,复用 goroutines
- 定期清理过期的 goroutines,进一步节省资源
- 提供了大量有用的接口:任务提交、获取运行中的 goroutine 数量、动态调整 Pool 大小、释放 Pool、重启 Pool
- 优雅处理 panic,防止程序崩溃
- 资源复用,极大节省内存使用量;在大规模批量并发任务场景下比原生 goroutine 并发具有更高的性能
- 非阻塞机制
1.ants库结构
学习一个库先从结构看起吧,pool、pool_func、ants初始化一个pool等操作都在这里
- pool.go提供了ants.NewPool(创建协程池)、Submit(task func())提交任务
- pool_func.go使用NewPoolWithFunc(创建pool对象需要带具体的函数),并且使用Invoke(args interface{})进行调用,arg就是传给池函数func(interface{})的参数
- options.go使用函数选项模式进行参数配置
- ants.go给初始化默认协程池对象defaultAntsPool(默认的pool容量是math.MaxInt32)提供了公共函数
介绍完了主要的库文件后,我们进行逐个的了解,具体的使用,我们可以结合官方的使用案例进行了解,这里就不进行展开了。
2.ants中Pool创建对象
创建Pool对象需调用ants.NewPool(size, options)函数,返回一个pool的指针
先看Pool的接口,对我们创建的Pool先做个初步印象
// NewPool generates an instance of ants pool.
func NewPool(size int, options ...Option) (*Pool, error) {
opts := loadOptions(options...)
if size <= 0 {
size = -1
}
if expiry := opts.ExpiryDuration; expiry < 0 {
return nil, ErrInvalidPoolExpiry
} else if expiry == 0 {
opts.ExpiryDuration = DefaultCleanIntervalTime
}
if opts.Logger == nil {
opts.Logger = defaultLogger
}
p := &Pool{
capacity: int32(size),
lock: internal.NewSpinLock(),
options: opts,
}
p.workerCache.New = func() interface{} {
return &goWorker{
pool: p,
task: make(chan func(), workerChanCap),
}
}
if p.options.PreAlloc {
if size == -1 {
return nil, ErrInvalidPreAllocSize
}
p.workers = newWorkerArray(loopQueueType, size)
} else {
p.workers = newWorkerArray(stackType, 0)
}
p.cond = sync.NewCond(p.lock)
// Start a goroutine to clean up expired workers periodically.
go p.purgePeriodically()
return p, nil
}
ants.NewPool创建Pool过程
- 接收size参数作为pool的容量,如果size<=0,那么不对池子容量进行限制
- loadOptions对Pool的配置,比如是否阻塞模式,
- workerCache这个sync.Pool对象的New方法,在调用sync.Pool的Get()方法时,如果为nil,则返回workerCache.New()的结果
- 是否初始化Pool是进行内存预分配(size > 0),来创建不同的worker(stack、loopQueue两种模式)
- 使用p.lock锁创建一个条件变量
- 开启一个协程定期清理过期的workers
3.ants中的PoolWithFunc
ants.PoolWithFunc创建PoolWithFunc和New.Pool整体的结构很像,多了个poolFunc func(interface{})字段,也就是提交到池子的函数,然后workers的类型不一样
4.理解worker
可以查看出pool中的worker在整个流程起着很重要的作用,也就是ants中为每个任务都是由 worker 对象来处理的,每个work都会创建一个goroutine来处理任务,ants中的worker结构如下
type goWorker struct {
//work的所属者
pool *Pool
//任务通道,通过这个发送给goWorker
task chan func()
//将work放入到队列时更新
recycleTime time.Time
}
从ants.Pool创建对象Pool的过程第四步可以看出,通过newWorkerArray创建workers,因为workerArray是个接口,有如下方法。
type workerArray interface {
len() int
isEmpty() bool
insert(worker *goWorker) error
detach() *goWorker
retrieveExpiry(duration time.Duration) []*goWorker
reset()
}
通过newWorkerArray,返回实现了workerArray接口的workerStack,这里newWorkerArray其实是用了个工厂方法来实现的,根据传入的类型,并不需要知道具体实现了接口的结构体,只要实现了workerArray接口就可以返回实现者的结构体,然后调用具体的实现
5.提交任务Submit
Submit(task func())接收一个func作为参数,将task通过通道task将类型为func的函数给到goWorker,然后调用retrieveWorker返回一个可用的worker给task
func (p *Pool) retrieveWorker() (w *goWorker) {
spawnWorker := func() {
w = p.workerCache.Get().(*goWorker)
w.run()
}
p.lock.Lock()
w = p.workers.detach()
if w != nil { // first try to fetch the worker from the queue
p.lock.Unlock()
} else if capacity := p.Cap(); capacity == -1 || capacity > p.Running() {
// if the worker queue is empty and we don't run out of the pool capacity,
// then just spawn a new worker goroutine.
p.lock.Unlock()
spawnWorker()
} else { // otherwise, we'll have to keep them blocked and wait for at least one worker to be put back into pool.
if p.options.Nonblocking {
p.lock.Unlock()
return
}
retry:
if p.options.MaxBlockingTasks != 0 && p.blockingNum >= p.options.MaxBlockingTasks {
p.lock.Unlock()
return
}
p.blockingNum++
p.cond.Wait() // block and wait for an available worker
p.blockingNum--
var nw int
if nw = p.Running(); nw == 0 { // awakened by the scavenger
p.lock.Unlock()
if !p.IsClosed() {
spawnWorker()
}
return
}
if w = p.workers.detach(); w == nil {
if nw < capacity {
p.lock.Unlock()
spawnWorker()
return
}
goto retry
}
p.lock.Unlock()
}
return
}
执行过程分析:
- spawnWorker是一个func,从p.workerCache这个sync.Pool获取一个goWorker对象(在New.Pool中有讲到),用sync.Locker上锁
- 调用p.workers.detach方法(前面提到p.workers实现了workerArray接口)
- 如果获取到了goWorker对象就直接返回
- 如果worker队列为空,并且Pool还有容量,那么调用spawnWorker,调用worker的run方法启动一个新的协程处理任务
- run方法的实现如下,从goWorker的channel中遍历待执行的func(),执行,并且在执行完后调用revertWorker放回workers
func (w *goWorker) run() {
w.pool.incRunning()
go func() {
for f := range w.task {
if f == nil {
return
}
f()
if ok := w.pool.revertWorker(w); !ok {
return
}
}
}()
}
6.释放和重启Pool
释放和重启Pool分别调用了Release和Reboot,这两个函数都在ants.Pool这个文件中可以找到,具体实现这里做个简单说明
- Release调用p.workers.reset()结束loopQueue或wokerStack中的 goroutine。都是通过发送nil到goWorker的task通道中,然后重置各个字段的值
- Reboot调用purgePeriodically,检测到Pool关闭了就直接退出了
7.细节
task缓冲通道
下面这个是NewPool变量workerCachesyn类型sync.Pool创建goWorker对象的代码
p.workerCache.New = func() interface{} {
return &goWorker{
pool: p,
task: make(chan func(), workerChanCap),
}
}
workerChanCap作为容量,这个变量定义在ants.go文件中的定义如下:
// workerChanCap determines whether the channel of a worker should be a buffered channel
// to get the best performance. Inspired by fasthttp at
// https://github.com/valyala/fasthttp/blob/master/workerpool.go#L139
workerChanCap = func() int {
// Use blocking channel if GOMAXPROCS=1.
// This switches context from sender to receiver immediately,
// which results in higher performance (under go1.5 at least).
if runtime.GOMAXPROCS(0) == 1 {
return 0
}
// Use non-blocking workerChan if GOMAXPROCS>1,
// since otherwise the sender might be dragged down if the receiver is CPU-bound.
return 1
}()
ants参考了著名的 Web框架fasthttp的实现。当GOMAXPROCS为 1时(即操作系统线程数为1),向通道task发送会挂起发送 goroutine,将执行流程转向接收goroutine,这能提升接收处理性能。如果GOMAXPROCS大于1,ants使用带缓冲的通道,为了防止接收 goroutine 是 CPU密集的,导致发送 goroutine 被阻塞。
自旋锁 SpinLock
在NewPool中lock,其实给lock初始化了一个自旋锁,这里是利用atomic.CompareAndSwapUint32()这个原子操作实现的,在加锁失败后不会等待,而是继续尝试,提高了加锁减锁的性能
在开发中刚好遇到需要ants,这次也做个记录作为分享,其实慢慢的会发现三方库的xx_test用例是最好的学习例子,希望能和大家一起知其然知其所以然,加油!
参考文档