对于支持CSP并发编程模型的Golang而言,要实现一个协程池是非常简单的。对于习惯了基于线程模型完成并发编程的开发同学,可能初次接触会有点难理解,但是俗话说"书读百遍其义自见",百来行的代码也并不多。
我们的目标是实现一个具有以下特性的协程池(熟悉Java的话,基本上就是实现了ExecutorService接口中的主要方法):
- 能够指定任务队列长度和工作协程的数量使用任务队列
- 能够支持启动和停止
- 能够等待投递到其中的任务执行完毕(等待时间可以指定)
核心的三个数据结构:
- 协程池
- 工作协程
- 任务
直接上代码:
package pool
import (
"sync"
"time"
)
const (
defaultWorkerQueueLength = 10 // 默认工作协程数量
defaultJobQueueLength = 1000 // 默认任务队列长度
)
type Job func()
type TimeoutPool struct {
workerQueue chan *worker
jobQueue chan Job
jobCount int
jobRet chan struct{}
stop chan struct{}
terminated chan struct{}
lock sync.Mutex
}
// 初始化一个带有执行超时时间的协程池,协程数量:10;任务队列长度1000
func NewTimeoutPoolWithDefaults() *TimeoutPool {
return NewTimeoutPool(defaultWorkerQueueLength, defaultJobQueueLength)
}
// 初始化一个带有执行超时时间的协程池,指定worker数量以及任务队列长度
func NewTimeoutPool(workerQueueLen, jobQueueLen int) *TimeoutPool {
pool := &TimeoutPool{
workerQueue: make(chan *worker, workerQueueLen),
jobQueue: make(chan Job, jobQueueLen),
jobRet: make(chan struct{}, jobQueueLen),
stop: make(chan struct{}),
terminated: make(chan struct{}),
}
return pool
}
// 停止协程池运行,如果有正在运行中的任务会等待其运行完毕
func (p *TimeoutPool) Terminate() {
p.stop <- struct{}{}
}
// 提交一个任务到协程池
func (p *TimeoutPool) Submit(job Job) {
p.jobQueue <- job
p.lock.Lock()
p.jobCount += 1
p.lock.Unlock()
}
// 启动并等待协程池内的运行全部运行结束 - 如果没有主动停止,如果有任务还在执行中会一直等待
// 如果返回true表示在规定时间范围内成功结束;返回false表示主动停止
// 注意:最终应该只有一个协程来调用Wait等待协程池运行结束,否则其中的计数存在竞态条件问题
func (p *TimeoutPool) StartAndWaitUntilTerminated() bool {
// 启动协程池
p.start()
// 等待运行结束
completed := 0
for completed < p.jobCount {
select {
case <-p.terminated:
return false
default:
select {
case <-p.jobRet:
completed += 1
case <-p.terminated:
return false
}
}
}
return true
}
// 启动并等待协程池内的运行全部运行结束
// 如果返回true表示在规定时间范围内成功结束;返回false表示运行整体超时或者主动停止
// 注意:最终应该只有一个协程来调用Wait等待协程池运行结束,否则其中的计数存在竞态条件问题
func (p *TimeoutPool) StartAndWait(timeout time.Duration) bool {
// 启动协程池
p.start()
// 等待运行结束
completed := 0
for completed < p.jobCount {
select {
case <-p.terminated:
return false
case <-time.After(timeout):
return false
default:
select {
case <-p.jobRet:
completed += 1
case <-p.terminated:
return false
case <-time.After(timeout):
return false
}
}
}
return true
}
// ~~ 内部实现
func (p *TimeoutPool) start() {
for i := 0; i < cap(p.workerQueue); i++ {
newWorker(p.workerQueue, p.jobRet)
}
go p.dispatch()
}
func (p *TimeoutPool) dispatch() {
for {
var job Job
select {
case job = <-p.jobQueue:
worker := <-p.workerQueue
worker.jobChannel <- job
case <-p.stop:
for i := 0; i < cap(p.workerQueue); i++ {
worker := <-p.workerQueue
worker.stop <- struct{}{}
<-worker.stop
}
p.terminated <- struct{}{}
return
}
}
}
type worker struct {
workerQueue chan *worker
jobChannel chan Job
jobRet chan struct{}
stop chan struct{}
}
func (w *worker) start() {
go func() {
for {
w.workerQueue <- w
var job Job
select {
case job = <-w.jobChannel:
job()
w.jobRet <- struct{}{}
case <-w.stop:
w.stop <- struct{}{}
return
}
}
}()
}
func newWorker(workerQueue chan *worker, jobRet chan struct{}) *worker {
worker := &worker{
workerQueue: workerQueue,
jobChannel: make(chan Job),
jobRet: jobRet,
stop: make(chan struct{}),
}
worker.start()
return worker
}
值得说明的一些细节:
- 提交任务时使用一个锁来保证多个协程并发提交任务时,总任务数能够正确计数
- channel发送一个信号后等待反馈:
worker.stop <- struct{}{} // 给工作协程发送一个stop信号
<-worker.stop // 等待工作协程成功stop后的反馈信号
- 等待执行结束的方法中多级select体现接收信号的优先级关系(源自多情况同时满足时,select会随机选择一个的语言特性):
func (p *TimeoutPool) StartAndWait(timeout time.Duration) bool {
// 启动协程池
p.start()
// 等待运行结束
completed := 0
for completed < p.jobCount {
// 外层的select优先级高,当有任务完成的信号和terminate信号同时出现时,优先选择terminate信号
select {
case <-p.terminated:
return false
case <-time.After(timeout):
return false
default:
// 内层的select优先级低,能够同时处理三种支持的信号
select {
case <-p.jobRet:
completed += 1
case <-p.terminated:
return false
case <-time.After(timeout):
return false
}
}
}
return true
}
- 停止协程池时,需要对所有的工作协程发出stop信号:
for i := 0; i < cap(p.workerQueue); i++ {
worker := <-p.workerQueue
worker.stop <- struct{}{}
<-worker.stop
}
注意这里的遍历方式,而不是for…range语法。使用range语法可能不是遍历所有工作协程,当工作协程处于执行中的状态时,按照上述实现它是不会被遍历到的