这里基于这篇写得很好的文章:

原版实现

在这篇文章中协程池结构为:

  1. 定义一个接口表示任务,每一个具体的任务实现这个接口。
  2. 使用 channel 作为任务队列,当有任务需要执行时,将这个任务插入到队列中。
  3. 开启固定的协程(worker)从任务队列中获取任务来执行。

上面这个协程池的特点:

  1. Go 程数量固定。可以将 worker 的数量设置为最大同时并发数 runtime.NumCPU()。
  2. Task 泛化。提供任务接口,支持多类型任务,不同业务场景下只要实现任务接口便可以提交到任务队列供 worker 调用。
  3. 简单易用。设计简约,实现简单,使用方便。
  • 原版代码如下:
package main

import (
  "fmt"
  "runtime"
  "sync"
  "time"
)

// Task 任务接口
type Task interface {
  Execute()
}

// Pool 协程池
type Pool struct {
  TaskChannel chan Task // 任务队列
}

// NewPool 创建一个协程池
func NewPool(cap ...int) *Pool {
  // 获取 worker 数量
  var n int
  if len(cap) > 0 {
    n = cap[0]
  }
  if n == 0 {
    n = runtime.NumCPU()
  }

  p := &Pool{
    TaskChannel: make(chan Task),
  }

  // 创建指定数量 worker 从任务队列取出任务执行
  for i := 0; i < n; i++ {
    go func() {
      for task := range p.TaskChannel {
        task.Execute()
      }
    }()
  }
  return p
}

// Submit 提交任务
func (p *Pool) Submit(t Task) {
  p.TaskChannel <- t
}

// EatFood 吃饭任务
type EatFood struct {
  wg *sync.WaitGroup
}

func (e *EatFood) Execute() {
  defer e.wg.Done()
  fmt.Println("eat cost 3 seconds")
  time.Sleep(3 * time.Second)
}

// WashFeet 洗脚任务
type WashFeet struct {
  wg *sync.WaitGroup
}

func (w *WashFeet) Execute() {
  defer w.wg.Done()
  fmt.Println("wash feet cost 3 seconds")
  time.Sleep(3 * time.Second)
}

// WatchTV 看电视任务
type WatchTV struct {
  wg *sync.WaitGroup
}

func (w *WatchTV) Execute() {
  defer w.wg.Done()
  fmt.Println("watch tv cost 3 seconds")
  time.Sleep(3 * time.Second)
}

func main() {
  p := NewPool()
  var wg sync.WaitGroup
  wg.Add(3)
  task1 := &EatFood{
    wg: &wg,
  }
  task2 := &WashFeet{
    wg: &wg,
  }
  task3 := &WatchTV{
    wg: &wg,
  }
  p.Submit(task1)
  p.Submit(task2)
  p.Submit(task3)
  // 等待所有任务执行完成
  wg.Wait()
}

改进1

将任务队列中的任务设计为无参匿名函数,这样子使用起来可能会更简单。一些开源协程池,例如 panjf2000/ants 也正是这样用的。

type Pool struct {
  TaskChannel chan func() // 任务队列
}

所以基于此对以上协程池进行改进:

package main

import (
  "fmt"
  "runtime"
  "sync"
  "time"
)

// Pool 协程池
type Pool struct {
  TaskChannel chan func() // fuc类型任务队列
}

// NewPool 创建一个协程池
func NewPool(cap ...int) *Pool {
  // 获取 worker 数量
  var n int
  if len(cap) > 0 {
    n = cap[0]
  }
  if n == 0 {
    n = runtime.NumCPU() // 默认等于CPU线程数
  }
  // 初始化 Pool.TaskChannel
  p := &Pool{
    TaskChannel: make(chan func()),
  }

  // 创建指定数量 worker 从任务队列取出任务执行
  for i := 0; i < n; i++ {
    go func() {
      for task := range p.TaskChannel {
        task()    // 取出的即位 func 类型,直接加括号即运行
      }
    }()
  }
  return p
}

// Submit 提交任务
func (p *Pool) Submit(f func()) {
  p.TaskChannel <- f
}

func main() {
  p := NewPool()
  var wg sync.WaitGroup
  wg.Add(3)
  task1 := func() {
    fmt.Println("eat cost 3 seconds")
    time.Sleep(3 * time.Second)
    wg.Done()
  }
  task2 := func() {
    defer wg.Done()
    fmt.Println("wash feet cost 3 seconds")
    time.Sleep(3 * time.Second)
  }
  task3 := func() {
    fmt.Println("watch tv cost 3 seconds")
    time.Sleep(3 * time.Second)
    wg.Done()
  }
  p.Submit(task1)
  p.Submit(task2)
  p.Submit(task3)
  // 等待所有任务执行完成
  wg.Wait()
}

几处需要注意的地方:

func NewPool(cap ...int) *Poolcap ...intTaskChannel()mainfunc()TaskChannelmainsync.WaitGroupwg.Done()defer

改进2

Pool structGoNum
package main

import (
  "fmt"
  "runtime"
  "sync"
  "time"
)

// Pool 协程池
type Pool struct {
  TaskChannel chan func() // fuc类型任务队列
  GoNum       int         // 任务数量
}

// NewPool 创建一个协程池
func NewPool(cap ...int) *Pool {
  // 获取 worker 数量
  var n int
  if len(cap) > 0 {
    n = cap[0]
  }
  if n == 0 {
    n = runtime.NumCPU() // 默认等于CPU线程数
  }
  // p 是 Pool的引用
  p := &Pool{
    TaskChannel: make(chan func()),
    GoNum:       n,
  }
  return p
}

// StartPool 启动协程池
func StartPool(p *Pool) {
  // 创建指定数量 worker 从任务队列取出任务执行
  for i := 0; i < p.GoNum; i++ {
    go func() {
      for task := range p.TaskChannel {
        task()
      }
    }()
  }
}

// Submit 提交任务
func (p *Pool) Submit(f func()) {
  p.TaskChannel <- f
}

func main() {
  p := NewPool()
  StartPool(p)
  var wg sync.WaitGroup
  wg.Add(3)
  task1 := func() {
    fmt.Println("eat cost 3 seconds")
    time.Sleep(3 * time.Second)
    wg.Done()
  }
  task2 := func() {
    defer wg.Done()
    fmt.Println("wash feet cost 3 seconds")
    time.Sleep(3 * time.Second)
  }
  task3 := func() {
    fmt.Println("watch tv cost 3 seconds")
    time.Sleep(3 * time.Second)
    wg.Done()
  }
  p.Submit(task1)
  p.Submit(task2)
  p.Submit(task3)
  // 等待所有任务执行完成
  wg.Wait()
}

上面这些协程池,设计简约,实现和使用起来也比较简单方便,但是严格来说,其并不是一个成熟的协程池,因为并没有提供 worker 与 go 程池的状态控制能力,worker 数量也无法根据节点算力和业务晚高峰时进行动态的扩增和缩减。

如果没有动态扩缩容的能力,那么很有可能出现 go 程的并发量不足以完全利用节点的算力,或者请求量不足的情况下,出现部分 go 程长期空闲的情况。

总地来说上面简易协程池的不足:

  1. 无法知道 worker 与 pool 的状态;
  2. worker 数量不足无法动态扩增;
  3. worker 数量过多无法自动缩减。