对于支持CSP并发编程模型的Golang而言,要实现一个协程池是非常简单的。对于习惯了基于线程模型完成并发编程的开发同学,可能初次接触会有点难理解,但是俗话说"书读百遍其义自见",百来行的代码也并不多。

我们的目标是实现一个具有以下特性的协程池(熟悉Java的话,基本上就是实现了ExecutorService接口中的主要方法):

  1. 能够指定任务队列长度和工作协程的数量使用任务队列
  2. 能够支持启动和停止
  3. 能够等待投递到其中的任务执行完毕(等待时间可以指定)

核心的三个数据结构:

  1. 协程池
  2. 工作协程
  3. 任务

直接上代码:

package pool

import (
   "sync"
   "time"
)

const (
   defaultWorkerQueueLength = 10   // 默认工作协程数量
   defaultJobQueueLength    = 1000 // 默认任务队列长度
)

type Job func()

type TimeoutPool struct {
   workerQueue chan *worker
   jobQueue    chan Job
   jobCount    int
   jobRet      chan struct{}
   stop        chan struct{}
   terminated  chan struct{}
   lock        sync.Mutex
}

// 初始化一个带有执行超时时间的协程池,协程数量:10;任务队列长度1000
func NewTimeoutPoolWithDefaults() *TimeoutPool {
   return NewTimeoutPool(defaultWorkerQueueLength, defaultJobQueueLength)
}

// 初始化一个带有执行超时时间的协程池,指定worker数量以及任务队列长度
func NewTimeoutPool(workerQueueLen, jobQueueLen int) *TimeoutPool {
   pool := &TimeoutPool{
      workerQueue: make(chan *worker, workerQueueLen),
      jobQueue:    make(chan Job, jobQueueLen),
      jobRet:      make(chan struct{}, jobQueueLen),
      stop:        make(chan struct{}),
      terminated:  make(chan struct{}),
   }

   return pool
}

// 停止协程池运行,如果有正在运行中的任务会等待其运行完毕
func (p *TimeoutPool) Terminate() {
   p.stop <- struct{}{}
}

// 提交一个任务到协程池
func (p *TimeoutPool) Submit(job Job) {
   p.jobQueue <- job

   p.lock.Lock()
   p.jobCount += 1
   p.lock.Unlock()
}

// 启动并等待协程池内的运行全部运行结束 - 如果没有主动停止,如果有任务还在执行中会一直等待
// 如果返回true表示在规定时间范围内成功结束;返回false表示主动停止
// 注意:最终应该只有一个协程来调用Wait等待协程池运行结束,否则其中的计数存在竞态条件问题
func (p *TimeoutPool) StartAndWaitUntilTerminated() bool {
   // 启动协程池
   p.start()

   // 等待运行结束
   completed := 0
   for completed < p.jobCount {
      select {      
      case <-p.terminated:
         return false
      default:
         select {
         case <-p.jobRet:
            completed += 1
         case <-p.terminated:
            return false
         }
      }
   }

   return true
}

// 启动并等待协程池内的运行全部运行结束
// 如果返回true表示在规定时间范围内成功结束;返回false表示运行整体超时或者主动停止
// 注意:最终应该只有一个协程来调用Wait等待协程池运行结束,否则其中的计数存在竞态条件问题
func (p *TimeoutPool) StartAndWait(timeout time.Duration) bool {
   // 启动协程池
   p.start()

   // 等待运行结束
   completed := 0
   for completed < p.jobCount {
      select {
      case <-p.terminated:
         return false
      case <-time.After(timeout):
         return false
      default:
         select {
         case <-p.jobRet:
            completed += 1
         case <-p.terminated:
            return false
         case <-time.After(timeout):
            return false
         }
      }
   }

   return true
}

// ~~ 内部实现

func (p *TimeoutPool) start() {
   for i := 0; i < cap(p.workerQueue); i++ {
      newWorker(p.workerQueue, p.jobRet)
   }

   go p.dispatch()
}

func (p *TimeoutPool) dispatch() {
   for {
      var job Job
      select {
      case job = <-p.jobQueue:
         worker := <-p.workerQueue
         worker.jobChannel <- job
      case <-p.stop:
         for i := 0; i < cap(p.workerQueue); i++ {
            worker := <-p.workerQueue
            worker.stop <- struct{}{}
            <-worker.stop
         }
         p.terminated <- struct{}{}
         return
      }
   }
}

type worker struct {
   workerQueue chan *worker
   jobChannel  chan Job
   jobRet      chan struct{}
   stop        chan struct{}
}

func (w *worker) start() {
   go func() {
      for {
         w.workerQueue <- w
         var job Job
         select {
         case job = <-w.jobChannel:
            job()
            w.jobRet <- struct{}{}
         case <-w.stop:
            w.stop <- struct{}{}
            return
         }
      }
   }()
}

func newWorker(workerQueue chan *worker, jobRet chan struct{}) *worker {
   worker := &worker{
      workerQueue: workerQueue,
      jobChannel:  make(chan Job),
      jobRet:      jobRet,
      stop:        make(chan struct{}),
   }

   worker.start()
   return worker
}

值得说明的一些细节:

  1. 提交任务时使用一个锁来保证多个协程并发提交任务时,总任务数能够正确计数
  2. channel发送一个信号后等待反馈:
worker.stop <- struct{}{}  // 给工作协程发送一个stop信号
<-worker.stop              // 等待工作协程成功stop后的反馈信号
  1. 等待执行结束的方法中多级select体现接收信号的优先级关系(源自多情况同时满足时,select会随机选择一个的语言特性):
func (p *TimeoutPool) StartAndWait(timeout time.Duration) bool {
   // 启动协程池
   p.start()

   // 等待运行结束
   completed := 0
   for completed < p.jobCount {
      // 外层的select优先级高,当有任务完成的信号和terminate信号同时出现时,优先选择terminate信号
      select {
      case <-p.terminated:
         return false
      case <-time.After(timeout):
         return false
      default:
         // 内层的select优先级低,能够同时处理三种支持的信号
         select {
         case <-p.jobRet:
            completed += 1
         case <-p.terminated:
            return false
         case <-time.After(timeout):
            return false
         }
      }
   }

   return true
}
  1. 停止协程池时,需要对所有的工作协程发出stop信号:
for i := 0; i < cap(p.workerQueue); i++ {
    worker := <-p.workerQueue
    worker.stop <- struct{}{}
    <-worker.stop
 }

注意这里的遍历方式,而不是for…range语法。使用range语法可能不是遍历所有工作协程,当工作协程处于执行中的状态时,按照上述实现它是不会被遍历到的