目录
1. 为什么需要协程池?
虽然go语言自带“高并发”的标签,其并发编程就是由groutine实现的,因其消耗资源低(大约2KB左右,线程通常2M左右),性能高效,开发成本低的特性而被广泛应用到各种场景,例如服务端开发中使用的HTTP服务,在golang net/http包中,每一个被监听到的tcp链接都是由一个groutine去完成处理其上下文的,由此使得其拥有极其优秀的并发量吞吐量。
但是,如果无休止的开辟Goroutine依然会出现高频率的调度Groutine,那么依然会浪费很多上下文切换的资源,导致做无用功。所以设计一个Goroutine池限制Goroutine的开辟个数在大型并发场景还是必要的。
2. 简单的协程池
package main
import (
"fmt"
"time"
)
/* 有关Task任务相关定义及操作 */
//定义任务Task类型,每一个任务Task都可以抽象成一个函数
type Task struct {
f func() error //一个无参的函数类型
}
//通过NewTask来创建一个Task
func NewTask(f func() error) *Task {
t := Task{
f: f,
}
return &t
}
//执行Task任务的方法
func (t *Task) Execute() {
t.f() //调用任务所绑定的函数
}
/* 有关协程池的定义及操作 */
//定义池类型
type Pool struct {
EntryChannel chan *Task //对外接收Task的入口
worker_num int //协程池最大worker数量,限定Goroutine的个数
JobsChannel chan *Task //协程池内部的任务就绪队列
}
//创建一个协程池
func NewPool(cap int) *Pool {
p := Pool{
EntryChannel: make(chan *Task),
worker_num: cap,
JobsChannel: make(chan *Task),
}
return &p
}
//协程池创建一个worker并且开始工作
func (p *Pool) worker(work_ID int) {
//worker不断的从JobsChannel内部任务队列中拿任务
for task := range p.JobsChannel {
//如果拿到任务,则执行task任务
task.Execute()
fmt.Println("worker ID ", work_ID, " 执行完毕任务")
}
}
//让协程池Pool开始工作
func (p *Pool) Run() {
//1,首先根据协程池的worker数量限定,开启固定数量的Worker,
// 每一个Worker用一个Goroutine承载
for i := 0; i < p.worker_num; i++ {
fmt.Println("开启固定数量的Worker:", i)
go p.worker(i)
}
//2, 从EntryChannel协程池入口取外界传递过来的任务
// 并且将任务送进JobsChannel中
for task := range p.EntryChannel {
p.JobsChannel <- task
}
//3, 执行完毕需要关闭JobsChannel
close(p.JobsChannel)
fmt.Println("执行完毕需要关闭JobsChannel")
//4, 执行完毕需要关闭EntryChannel
close(p.EntryChannel)
fmt.Println("执行完毕需要关闭EntryChannel")
}
//主函数
func main() {
//创建一个Task
t := NewTask(func() error {
fmt.Println("创建一个Task:", time.Now().Format("2006-01-02 15:04:05"))
return nil
})
//创建一个协程池,最大开启3个协程worker
p := NewPool(3)
//开一个协程 不断的向 Pool 输送打印一条时间的task任务
go func() {
for {
p.EntryChannel <- t
}
}()
//启动协程池p
p.Run()
}
3. go-playground/pool
上面的协程池虽然简单,但是对于每一个并发任务的状态,pool的状态缺少控制,我们可以看看go-playground/pool的源码实现,“源码面前,如同裸奔”。先从每一个需要执行的任务入手,该库中对并发单元做了如下的结构体,可以看到除工作单元的值,错误,执行函数等,还用了三个分别表示,取消,取消中,写 的三个并发安全的原子操作值来标识其运行状态。
依赖包下载: go get "gopkg.in/go-playground/pool.v3"
package main
import (
"fmt"
"gopkg.in/go-playground/pool.v3"
"time"
)
func SendMail(int int) pool.WorkFunc {
fn := func(wu pool.WorkUnit) (interface{}, error) {
// sleep 1s 模拟发邮件过程
time.Sleep(time.Second * 1)
// 模拟异常任务需要取消
if int == 17 {
wu.Cancel()
}
if wu.IsCancelled() {
return false, nil
}
fmt.Println("send to", int)
return true, nil
}
return fn
}
func main() {
// 初始化groutine数量为20的pool
p := pool.NewLimited(20)
defer p.Close()
batch := p.Batch()
// 设置一个批量任务的过期超时时间
t := time.After(10 * time.Second)
go func() {
for i := 0; i < 100; i++ {
batch.Queue(SendMail(i)) // 往批量任务中添加workFunc任务
}
// 通知批量任务不再接受新的workFunc, 如果添加完workFunc不执行改方法的话将导致取结果集时done channel一直阻塞
batch.QueueComplete()
}()
// // 获取批量任务结果集, 因为 batch.Results 中要close results channel 所以不能将其放在LOOP中执行
r := batch.Results()
LOOP:
for {
select {
case <-t:
// 超时通知
fmt.Println("超时通知")
break LOOP
case email, ok := <-r:
// 读取结果集
if ok {
if err := email.Error(); err != nil {
fmt.Println("读取结果集错误,error info:", err.Error())
}
fmt.Println("错误结果集:", email.Value())
} else {
fmt.Println("finish")
break LOOP
}
}
}
}
go-playground/pool相比简单的协程池, 对pool, worker的状态有了很好的管理。但是在第一个实现的简单groutine池和go-playground/pool中,都是先启动预定好的groutine来完成任务执行,在并发量远小于任务量的情况下确实能够做到groutine的复用,如果任务量不多则会导致任务分配到每个groutine不均匀,甚至可能出现启动的groutine根本不会执行任务从而导致浪费,而且对于协程池也没有动态的扩容和缩小。接下来了解下ants的设计和实现。
4. ants(推荐)
ants是一个受fasthttp启发的高性能协程池,fasthttp号称是比go原生的net/http快10倍,其原因之一就是采用了各种池化技术, ants相比之前两种协程池,其模型更像是之前接触到的数据库连接池,需要从空余的worker中取出一个来执行任务, 当无可用空余worker的时候再去创建,而当pool的容量达到上线之后,剩余的任务阻塞等待当前进行中的worker执行完毕将worker放回pool, 直至pool中有空闲worker。 ants在内存的管理上做得很好,除了定期清除过期worker(一定时间内没有分配到任务的worker),ants还实现了一种适用于大批量相同任务的pool, 这种pool与一个需要大批量重复执行的函数锁绑定,避免了调用方不停的创建,更加节省内存。
package main
import (
"fmt"
"github.com/panjf2000/ants"
"sync"
"time"
)
//任务
func sendMail(i int, wg *sync.WaitGroup) func() {
var cnt int
return func() {
for {
time.Sleep(time.Second * 2)
fmt.Println("send mail to ", i)
cnt++
if cnt > 5 && i == 1 {
fmt.Println("退出协程ID:", i)
break
}
}
wg.Done()
}
}
func main() {
wg := sync.WaitGroup{}
//申请一个协程池对象
pool, _ := ants.NewPool(2)
//关闭协程池
defer pool.Release()
// 向pool提交任务
for i := 1; i <= 5; i++ {
pool.Submit(sendMail(i, &wg))
wg.Add(1)
}
wg.Wait()
}
源码中提到, ants的吞吐量能够比原生groutine高出N倍,内存节省10到20倍。
源码参考: