在 Golang 中要创建一个协程是一件无比简单的事情,你只要定义一个函数,并使用 go 关键字去执行它就行了。
如果你接触过其他语言,会发现你在使用使用线程时,为了减少线程频繁创建销毁还来的开销,通常我们会使用线程池来复用线程。
池化技术就是利用复用来提升性能的,那在 Golang 中需要协程池吗?
在 Golang 中,goroutine 是一个轻量级的线程,他的创建、调度都是在用户态进行,并不需要进入内核,这意味着创建销毁协程带来的开销是非常小的。
因此,我认为大多数情况下,开发人员是不太需要使用协程池的。
但也不排除有某些场景下是需要这样做,因为我还没有遇到就不说了。
抛开是否必要这个问题,单纯从技术的角度来看,我们可以怎样实现一个通用的协程池呢?
下面就来一起学习一下我的写法
首先定义一个协程池(Pool)结构体,包含两个属性,都是 chan 类型的。
一个是 work,用于接收 task 任务
一个是 sem,用于设置协程池大小,即可同时执行的协程数量
type Pool struct {
work chan func() // 任务
sem chan struct{} // 数量
}
然后定义一个 New 函数,用于创建一个协程池对象,有一个细节需要注意
work 是一个无缓冲通道
而 sem 是一个缓冲通道,size 大小即为协程池大小
func New(size int) *Pool {
return &Pool{
work: make(chan func()),
sem: make(chan struct{}, size),
}
}
最后给协程池对象绑定两个函数
1、NewTask:往协程池中添加任务
当第一次调用 NewTask 添加任务的时候,由于 work 是无缓冲通道,所以会一定会走第二个 case 的分支:使用 go worker 开启一个协程。
func (p *Pool) NewTask(task func()) {
select {
case p.work <- task:
case p.sem <- struct{}{}:
go p.worker(task)
}
}
2、worker:用于执行任务
为了能够实现协程的复用,这个使用了 for 无限循环,使这个协程在执行完任务后,也不退出,而是一直在接收新的任务。
func (p *Pool) worker(task func()) {
defer func() { <-p.sem }()
for {
task()
task = <-p.work
}
}
这两个函数是协程池实现的关键函数,里面的逻辑很值得推敲:
1、如果设定的协程池数大于 2,此时第二次传入往 NewTask 传入task,select case 的时候,如果第一个协程还在运行中,就一定会走第二个case,重新创建一个协程执行task
2、如果传入的任务数大于设定的协程池数,并且此时所有的任务都还在运行中,那此时再调用 NewTask 传入 task ,这两个 case 都不会命中,会一直阻塞直到有任务执行完成,worker 函数里的 work 通道才能接收到新的任务,继续执行。
以上便是协程池的实现过程。
使用它也很简单,看下面的代码你就明白了
func main() {
pool := New(128)
pool.NewTask(func(){
fmt.Println("run task")
})
}
为了让你看到效果,我设置协程池数为 2,开启四个任务,都是 sleep 2 秒后,打印当前时间。
func main() {
pool := New(2)
for i := 1; i <5; i++{
pool.NewTask(func(){
time.Sleep(2 * time.Second)
fmt.Println(time.Now())
})
}
// 保证所有的协程都执行完毕
time.Sleep(5 * time.Second)
}
执行结果如下,可以看到总共 4 个任务,由于协程池大小为 2,所以 4 个任务分两批执行(从打印的时间可以看出)
2020-05-24 23:18:02.014487 +0800 CST m=+2.005207182
2020-05-24 23:18:02.014524 +0800 CST m=+2.005243650
2020-05-24 23:18:04.019755 +0800 CST m=+4.010435443
2020-05-24 23:18:04.019819 +0800 CST m=+4.010499440