golang中goroutine由运行时管理,使用go关键字就可以方便快捷的创建一个goroutine,受限于服务器硬件内存大小,如果不对goroutine数量进行限制,会出现Out of Memory错误。但是goroutine泄漏引发的血案,想必各位gopher都经历过,通过协程池限制goroutine数一个有效避免泄漏的手段,但是自己手动实现一个协程池,总是会兼顾不到各种场景,比如释放,处理panic,动态扩容等。那么ants是公认的优秀实现协程池。

ants简介

ants是一个高性能的 goroutine 池,实现了对大规模 goroutine 的调度管理、goroutine 复用,允许使用者在开发并发程序的时候限制 goroutine 数量,复用资源,达到更高效执行任务的效果

功能

  1. 自动调度海量的 goroutines,复用 goroutines
  2. 定期清理过期的 goroutines,进一步节省资源
  3. 提供了大量有用的接口:任务提交、获取运行中的 goroutine 数量、动态调整 Pool 大小、释放 Pool、重启 Pool
  4. 优雅处理 panic,防止程序崩溃
  5. 资源复用,极大节省内存使用量;在大规模批量并发任务场景下比原生 goroutine 并发具有更高的性能
  6. 非阻塞机制

1.ants库结构

学习一个库先从结构看起吧,pool、pool_func、ants初始化一个pool等操作都在这里

ants库代码结构
  1. pool.go提供了ants.NewPool(创建协程池)、Submit(task func())提交任务
  2. pool_func.go使用NewPoolWithFunc(创建pool对象需要带具体的函数),并且使用Invoke(args interface{})进行调用,arg就是传给池函数func(interface{})的参数
  3. options.go使用函数选项模式进行参数配置
  4. ants.go给初始化默认协程池对象defaultAntsPool(默认的pool容量是math.MaxInt32)提供了公共函数

介绍完了主要的库文件后,我们进行逐个的了解,具体的使用,我们可以结合官方的使用案例进行了解,这里就不进行展开了。

2.ants中Pool创建对象

创建Pool对象需调用ants.NewPool(size, options)函数,返回一个pool的指针

先看Pool的接口,对我们创建的Pool先做个初步印象

Pool结构
// NewPool generates an instance of ants pool.
func NewPool(size int, options ...Option) (*Pool, error) {
    opts := loadOptions(options...)

    if size <= 0 {
        size = -1
    }

    if expiry := opts.ExpiryDuration; expiry < 0 {
        return nil, ErrInvalidPoolExpiry
    } else if expiry == 0 {
        opts.ExpiryDuration = DefaultCleanIntervalTime
    }

    if opts.Logger == nil {
        opts.Logger = defaultLogger
    }

    p := &Pool{
        capacity: int32(size),
        lock:     internal.NewSpinLock(),
        options:  opts,
    }
    p.workerCache.New = func() interface{} {
        return &goWorker{
            pool: p,
            task: make(chan func(), workerChanCap),
        }
    }
    if p.options.PreAlloc {
        if size == -1 {
            return nil, ErrInvalidPreAllocSize
        }
        p.workers = newWorkerArray(loopQueueType, size)
    } else {
        p.workers = newWorkerArray(stackType, 0)
    }

    p.cond = sync.NewCond(p.lock)

    // Start a goroutine to clean up expired workers periodically.
    go p.purgePeriodically()

    return p, nil
}

ants.NewPool创建Pool过程

  1. 接收size参数作为pool的容量,如果size<=0,那么不对池子容量进行限制
  2. loadOptions对Pool的配置,比如是否阻塞模式,
  3. workerCache这个sync.Pool对象的New方法,在调用sync.Pool的Get()方法时,如果为nil,则返回workerCache.New()的结果
  4. 是否初始化Pool是进行内存预分配(size > 0),来创建不同的worker(stack、loopQueue两种模式)
  5. 使用p.lock锁创建一个条件变量
  6. 开启一个协程定期清理过期的workers

3.ants中的PoolWithFunc

ants.PoolWithFunc创建PoolWithFunc和New.Pool整体的结构很像,多了个poolFunc func(interface{})字段,也就是提交到池子的函数,然后workers的类型不一样

4.理解worker

可以查看出pool中的worker在整个流程起着很重要的作用,也就是ants中为每个任务都是由 worker 对象来处理的,每个work都会创建一个goroutine来处理任务,ants中的worker结构如下

type goWorker struct {
    //work的所属者
    pool *Pool

    //任务通道,通过这个发送给goWorker
    task chan func()

    //将work放入到队列时更新
    recycleTime time.Time
}

从ants.Pool创建对象Pool的过程第四步可以看出,通过newWorkerArray创建workers,因为workerArray是个接口,有如下方法。

type workerArray interface {
    len() int
    isEmpty() bool
    insert(worker *goWorker) error
    detach() *goWorker
    retrieveExpiry(duration time.Duration) []*goWorker
    reset()
}

通过newWorkerArray,返回实现了workerArray接口的workerStack,这里newWorkerArray其实是用了个工厂方法来实现的,根据传入的类型,并不需要知道具体实现了接口的结构体,只要实现了workerArray接口就可以返回实现者的结构体,然后调用具体的实现

5.提交任务Submit

Submit(task func())接收一个func作为参数,将task通过通道task将类型为func的函数给到goWorker,然后调用retrieveWorker返回一个可用的worker给task

func (p *Pool) retrieveWorker() (w *goWorker) {
    spawnWorker := func() {
        w = p.workerCache.Get().(*goWorker)
        w.run()
    }

    p.lock.Lock()

    w = p.workers.detach()
    if w != nil { // first try to fetch the worker from the queue
        p.lock.Unlock()
    } else if capacity := p.Cap(); capacity == -1 || capacity > p.Running() {
        // if the worker queue is empty and we don't run out of the pool capacity,
        // then just spawn a new worker goroutine.
        p.lock.Unlock()
        spawnWorker()
    } else { // otherwise, we'll have to keep them blocked and wait for at least one worker to be put back into pool.
        if p.options.Nonblocking {
            p.lock.Unlock()
            return
        }
    retry:
        if p.options.MaxBlockingTasks != 0 && p.blockingNum >= p.options.MaxBlockingTasks {
            p.lock.Unlock()
            return
        }
        p.blockingNum++
        p.cond.Wait() // block and wait for an available worker
        p.blockingNum--
        var nw int
        if nw = p.Running(); nw == 0 { // awakened by the scavenger
            p.lock.Unlock()
            if !p.IsClosed() {
                spawnWorker()
            }
            return
        }
        if w = p.workers.detach(); w == nil {
            if nw < capacity {
                p.lock.Unlock()
                spawnWorker()
                return
            }
            goto retry
        }

        p.lock.Unlock()
    }
    return
}

执行过程分析:

  1. spawnWorker是一个func,从p.workerCache这个sync.Pool获取一个goWorker对象(在New.Pool中有讲到),用sync.Locker上锁
  2. 调用p.workers.detach方法(前面提到p.workers实现了workerArray接口)
  3. 如果获取到了goWorker对象就直接返回
  4. 如果worker队列为空,并且Pool还有容量,那么调用spawnWorker,调用worker的run方法启动一个新的协程处理任务
  5. run方法的实现如下,从goWorker的channel中遍历待执行的func(),执行,并且在执行完后调用revertWorker放回workers
func (w *goWorker) run() {
    w.pool.incRunning()
    go func() {

        for f := range w.task {
            if f == nil {
                return
            }
            f()
            if ok := w.pool.revertWorker(w); !ok {
                return
            }
        }
    }()
}

6.释放和重启Pool

释放和重启Pool分别调用了Release和Reboot,这两个函数都在ants.Pool这个文件中可以找到,具体实现这里做个简单说明

  1. Release调用p.workers.reset()结束loopQueue或wokerStack中的 goroutine。都是通过发送nil到goWorker的task通道中,然后重置各个字段的值
  2. Reboot调用purgePeriodically,检测到Pool关闭了就直接退出了

7.细节

task缓冲通道

下面这个是NewPool变量workerCachesyn类型sync.Pool创建goWorker对象的代码

p.workerCache.New = func() interface{} {
        return &goWorker{
            pool: p,
            task: make(chan func(), workerChanCap),
        }
    }

workerChanCap作为容量,这个变量定义在ants.go文件中的定义如下:

// workerChanCap determines whether the channel of a worker should be a buffered channel
    // to get the best performance. Inspired by fasthttp at
    // https://github.com/valyala/fasthttp/blob/master/workerpool.go#L139
    workerChanCap = func() int {
        // Use blocking channel if GOMAXPROCS=1.
        // This switches context from sender to receiver immediately,
        // which results in higher performance (under go1.5 at least).
        if runtime.GOMAXPROCS(0) == 1 {
            return 0
        }

        // Use non-blocking workerChan if GOMAXPROCS>1,
        // since otherwise the sender might be dragged down if the receiver is CPU-bound.
        return 1
    }()

ants参考了著名的 Web框架fasthttp的实现。当GOMAXPROCS为 1时(即操作系统线程数为1),向通道task发送会挂起发送 goroutine,将执行流程转向接收goroutine,这能提升接收处理性能。如果GOMAXPROCS大于1,ants使用带缓冲的通道,为了防止接收 goroutine 是 CPU密集的,导致发送 goroutine 被阻塞。

自旋锁 SpinLock

在NewPool中lock,其实给lock初始化了一个自旋锁,这里是利用atomic.CompareAndSwapUint32()这个原子操作实现的,在加锁失败后不会等待,而是继续尝试,提高了加锁减锁的性能

在开发中刚好遇到需要ants,这次也做个记录作为分享,其实慢慢的会发现三方库的xx_test用例是最好的学习例子,希望能和大家一起知其然知其所以然,加油!

参考文档