这里基于这篇写得很好的文章:
原版实现
在这篇文章中协程池结构为:
- 定义一个接口表示任务,每一个具体的任务实现这个接口。
- 使用 channel 作为任务队列,当有任务需要执行时,将这个任务插入到队列中。
- 开启固定的协程(worker)从任务队列中获取任务来执行。
上面这个协程池的特点:
- Go 程数量固定。可以将 worker 的数量设置为最大同时并发数 runtime.NumCPU()。
- Task 泛化。提供任务接口,支持多类型任务,不同业务场景下只要实现任务接口便可以提交到任务队列供 worker 调用。
- 简单易用。设计简约,实现简单,使用方便。
- 原版代码如下:
package main
import (
"fmt"
"runtime"
"sync"
"time"
)
// Task 任务接口
type Task interface {
Execute()
}
// Pool 协程池
type Pool struct {
TaskChannel chan Task // 任务队列
}
// NewPool 创建一个协程池
func NewPool(cap ...int) *Pool {
// 获取 worker 数量
var n int
if len(cap) > 0 {
n = cap[0]
}
if n == 0 {
n = runtime.NumCPU()
}
p := &Pool{
TaskChannel: make(chan Task),
}
// 创建指定数量 worker 从任务队列取出任务执行
for i := 0; i < n; i++ {
go func() {
for task := range p.TaskChannel {
task.Execute()
}
}()
}
return p
}
// Submit 提交任务
func (p *Pool) Submit(t Task) {
p.TaskChannel <- t
}
// EatFood 吃饭任务
type EatFood struct {
wg *sync.WaitGroup
}
func (e *EatFood) Execute() {
defer e.wg.Done()
fmt.Println("eat cost 3 seconds")
time.Sleep(3 * time.Second)
}
// WashFeet 洗脚任务
type WashFeet struct {
wg *sync.WaitGroup
}
func (w *WashFeet) Execute() {
defer w.wg.Done()
fmt.Println("wash feet cost 3 seconds")
time.Sleep(3 * time.Second)
}
// WatchTV 看电视任务
type WatchTV struct {
wg *sync.WaitGroup
}
func (w *WatchTV) Execute() {
defer w.wg.Done()
fmt.Println("watch tv cost 3 seconds")
time.Sleep(3 * time.Second)
}
func main() {
p := NewPool()
var wg sync.WaitGroup
wg.Add(3)
task1 := &EatFood{
wg: &wg,
}
task2 := &WashFeet{
wg: &wg,
}
task3 := &WatchTV{
wg: &wg,
}
p.Submit(task1)
p.Submit(task2)
p.Submit(task3)
// 等待所有任务执行完成
wg.Wait()
}
改进1
将任务队列中的任务设计为无参匿名函数,这样子使用起来可能会更简单。一些开源协程池,例如 panjf2000/ants 也正是这样用的。
type Pool struct {
TaskChannel chan func() // 任务队列
}
所以基于此对以上协程池进行改进:
package main
import (
"fmt"
"runtime"
"sync"
"time"
)
// Pool 协程池
type Pool struct {
TaskChannel chan func() // fuc类型任务队列
}
// NewPool 创建一个协程池
func NewPool(cap ...int) *Pool {
// 获取 worker 数量
var n int
if len(cap) > 0 {
n = cap[0]
}
if n == 0 {
n = runtime.NumCPU() // 默认等于CPU线程数
}
// 初始化 Pool.TaskChannel
p := &Pool{
TaskChannel: make(chan func()),
}
// 创建指定数量 worker 从任务队列取出任务执行
for i := 0; i < n; i++ {
go func() {
for task := range p.TaskChannel {
task() // 取出的即位 func 类型,直接加括号即运行
}
}()
}
return p
}
// Submit 提交任务
func (p *Pool) Submit(f func()) {
p.TaskChannel <- f
}
func main() {
p := NewPool()
var wg sync.WaitGroup
wg.Add(3)
task1 := func() {
fmt.Println("eat cost 3 seconds")
time.Sleep(3 * time.Second)
wg.Done()
}
task2 := func() {
defer wg.Done()
fmt.Println("wash feet cost 3 seconds")
time.Sleep(3 * time.Second)
}
task3 := func() {
fmt.Println("watch tv cost 3 seconds")
time.Sleep(3 * time.Second)
wg.Done()
}
p.Submit(task1)
p.Submit(task2)
p.Submit(task3)
// 等待所有任务执行完成
wg.Wait()
}
几处需要注意的地方:
func NewPool(cap ...int) *Poolcap ...intTaskChannel()mainfunc()TaskChannelmainsync.WaitGroupwg.Done()defer
改进2
Pool structGoNum
package main
import (
"fmt"
"runtime"
"sync"
"time"
)
// Pool 协程池
type Pool struct {
TaskChannel chan func() // fuc类型任务队列
GoNum int // 任务数量
}
// NewPool 创建一个协程池
func NewPool(cap ...int) *Pool {
// 获取 worker 数量
var n int
if len(cap) > 0 {
n = cap[0]
}
if n == 0 {
n = runtime.NumCPU() // 默认等于CPU线程数
}
// p 是 Pool的引用
p := &Pool{
TaskChannel: make(chan func()),
GoNum: n,
}
return p
}
// StartPool 启动协程池
func StartPool(p *Pool) {
// 创建指定数量 worker 从任务队列取出任务执行
for i := 0; i < p.GoNum; i++ {
go func() {
for task := range p.TaskChannel {
task()
}
}()
}
}
// Submit 提交任务
func (p *Pool) Submit(f func()) {
p.TaskChannel <- f
}
func main() {
p := NewPool()
StartPool(p)
var wg sync.WaitGroup
wg.Add(3)
task1 := func() {
fmt.Println("eat cost 3 seconds")
time.Sleep(3 * time.Second)
wg.Done()
}
task2 := func() {
defer wg.Done()
fmt.Println("wash feet cost 3 seconds")
time.Sleep(3 * time.Second)
}
task3 := func() {
fmt.Println("watch tv cost 3 seconds")
time.Sleep(3 * time.Second)
wg.Done()
}
p.Submit(task1)
p.Submit(task2)
p.Submit(task3)
// 等待所有任务执行完成
wg.Wait()
}
上面这些协程池,设计简约,实现和使用起来也比较简单方便,但是严格来说,其并不是一个成熟的协程池,因为并没有提供 worker 与 go 程池的状态控制能力,worker 数量也无法根据节点算力和业务晚高峰时进行动态的扩增和缩减。
如果没有动态扩缩容的能力,那么很有可能出现 go 程的并发量不足以完全利用节点的算力,或者请求量不足的情况下,出现部分 go 程长期空闲的情况。
总地来说上面简易协程池的不足:
- 无法知道 worker 与 pool 的状态;
- worker 数量不足无法动态扩增;
- worker 数量过多无法自动缩减。