之前在工作中,有使用到协程池,特在此记录下

逻辑分析

要实现协程池,首先要明确协程池的必要条件

工作对象

Task,即原子工作任务,任务池及工作池以

并发数

maxWorkerNum 自定义性的并发控制

任务池

EntryChan 实现任务输入队列

工作池

WorkerChan 实现任务分发队列

上下文管理器

Context 实现协程池的中断控制

代码实现

type Task struct {
	Params   map[string]interface{}
	CallBack func(map[string]interface{})
}

// Pool 协程池 无缓冲,任务推送完成后需主动关闭Entry
type Pool struct {
	maxWorkerNum int
	WorkerChan   chan *Task
	EntryChan    chan *Task
	Context      context.Context
}

func NewGoPool(ctx context.Context, maxWorkerNum int) *Pool {
	return &Pool{
		Context:      ctx,
		maxWorkerNum: maxWorkerNum,
		WorkerChan:   make(chan *Task),
		EntryChan:    make(chan *Task),
	}
}

func (p Pool) work(ctx context.Context) {
	defer func() {
		if _, ok := <- p.WorkerChan; ok {
			close(p.WorkerChan)
		}
	}()
	for {
		select {
		case <-ctx.Done():
			return
		case task := <-p.WorkerChan:
			if task == nil {
				continue
			}
			task.CallBack(task.Params)
		}
	}
}

func (p Pool) Run() {

	for i := 0; i < p.maxWorkerNum; i++ {
		go p.work(p.Context)
	}

	for task := range p.EntryChan {
		p.WorkerChan <- task
	}

}