之前在工作中,有使用到协程池,特在此记录下
逻辑分析
要实现协程池,首先要明确协程池的必要条件
工作对象
Task,即原子工作任务,任务池及工作池以
并发数
maxWorkerNum 自定义性的并发控制
任务池
EntryChan 实现任务输入队列
工作池
WorkerChan 实现任务分发队列
上下文管理器
Context 实现协程池的中断控制
代码实现
type Task struct {
Params map[string]interface{}
CallBack func(map[string]interface{})
}
// Pool 协程池 无缓冲,任务推送完成后需主动关闭Entry
type Pool struct {
maxWorkerNum int
WorkerChan chan *Task
EntryChan chan *Task
Context context.Context
}
func NewGoPool(ctx context.Context, maxWorkerNum int) *Pool {
return &Pool{
Context: ctx,
maxWorkerNum: maxWorkerNum,
WorkerChan: make(chan *Task),
EntryChan: make(chan *Task),
}
}
func (p Pool) work(ctx context.Context) {
defer func() {
if _, ok := <- p.WorkerChan; ok {
close(p.WorkerChan)
}
}()
for {
select {
case <-ctx.Done():
return
case task := <-p.WorkerChan:
if task == nil {
continue
}
task.CallBack(task.Params)
}
}
}
func (p Pool) Run() {
for i := 0; i < p.maxWorkerNum; i++ {
go p.work(p.Context)
}
for task := range p.EntryChan {
p.WorkerChan <- task
}
}