一、大体流程和思路
基本思路:协程池有三个参数,协程的最大数,任务队列,等待处理结束。先把协程池启动起来,
然后把要处理的事情放进队列里面,所有协程从队列里面拿事情去处理,等待协程处理完毕所有事情,
关闭协程池。
二、具体做法
定义一个结构体
//具体的任务
type Job struct {
Fn func() error
Wait chan error
}
// 协程池
// 1. 生产者可以等待消费完成后, 退出
type Pool struct {
max int //最大的协程数量
quitC chan struct{} //退出
finishC chan struct{} //是否结束
job chan *Job //要执行的任务
wg sync.WaitGroup //用于协程等待
}
初始化一个协程池并启动池子
//初始化协程池
func NewPool(max int, queue int64) *Pool {
p := &Pool{
max: max,
quitC: make(chan struct{}),
finishC: make(chan struct{}),
job: make(chan *Job, queue),
wg: sync.WaitGroup{},
}
go p.start()
return p
}
//开启协程池
func (w *Pool) start() {
defer func() {
recover()
w.quit() //关闭quitC通道
}()
for i := 0; i < w.max; i++ { //循环开启max个协程
w.wg.Add(1)
go w.work() //每个协程死循环运行 啥时候停止
}
w.wg.Wait()
}
//具体的每一个协程
func (w *Pool) work() {
defer func() {
recover() //这个方法的作用是捕捉异常 并且恢复正常的执行
w.wg.Done()
}()
var idle bool
var finished bool
//一直循环取任务做 直到没任务可做
for {
select {
case job := <-w.job:
err := job.Fn()
if job.Wait != nil {
job.Wait <- err
close(job.Wait)
}
idle = false
default:
if finished {
return
}
if idle {
time.Sleep(1 * time.Millisecond)
}
idle = true
}
if !finished {
select {
case <-w.finishC:
finished = true
default:
}
}
}
}
//添加任务到队列
func (w *Pool) AddJob(job *Job) {
w.job <- job
}
func (w *Pool) AddFnJob(fn func() error) {
/**这里是做了一个错误处理
w.AddJob(&Job{
Fn: func() error {
return utils.RecoverWrapperError(fn)
},
})
**/
w.job <- &Job{
Fn: fn,
}
}
//关闭通道
func (w *Pool) quit() {
close(w.quitC)
}
// 协程池调用完成 等待
func (w *Pool) finish() {
close(w.finishC)
}
// 协程池等待退出
func (w *Pool) WaitFinish() {
w.finish()
<-w.quitC
}
三、外部调用协程池
func TestWorkerPool(t *testing.T) {
pool := NewPool(20, 10000) //总之这个协程初始化了并且启动起来了 job通道里面就
//只能缓冲10000个任务
for i := 0; i < 2000000; i++ { //这里会有200万个任务
pt:=i
fn := func() error {
atomic.AddInt32(&total, 1)
fmt.Println(pt)
return nil
}
pool.AddFnJob(fn) //往job通道里面放任务
}
fmt.Println("任务投放完毕")
pool.WaitFinish()
fmt.Println("执行完毕")
}
四、个人疑惑
虽然这个协程池写出来了,但是我没有发现太大的用处啊,每个请求过来还不是要继续创建协程啊,除非每个请求不需要初始化协程池(NewPool),然后协程池一直是启动状态,我每个请求过来只需要往job通道里面放任务就可以了,协程自动去抢任务,协程的数量能够根据任务动态变化。希望大佬们指点一下下要怎么做