目录

1. 为什么需要协程池?

虽然go语言自带“高并发”的标签,其并发编程就是由groutine实现的,因其消耗资源低(大约2KB左右,线程通常2M左右),性能高效,开发成本低的特性而被广泛应用到各种场景,例如服务端开发中使用的HTTP服务,在golang net/http包中,每一个被监听到的tcp链接都是由一个groutine去完成处理其上下文的,由此使得其拥有极其优秀的并发量吞吐量。

但是,如果无休止的开辟Goroutine依然会出现高频率的调度Groutine,那么依然会浪费很多上下文切换的资源,导致做无用功。所以设计一个Goroutine池限制Goroutine的开辟个数在大型并发场景还是必要的。

2. 简单的协程池

package main

import (
	"fmt"
	"time"
)

/* 有关Task任务相关定义及操作 */
//定义任务Task类型,每一个任务Task都可以抽象成一个函数
type Task struct {
	f func() error //一个无参的函数类型
}

//通过NewTask来创建一个Task
func NewTask(f func() error) *Task {
	t := Task{
		f: f,
	}
	return &t
}

//执行Task任务的方法
func (t *Task) Execute() {
	t.f() //调用任务所绑定的函数
}

/* 有关协程池的定义及操作 */
//定义池类型
type Pool struct {
	EntryChannel chan *Task //对外接收Task的入口
	worker_num   int        //协程池最大worker数量,限定Goroutine的个数
	JobsChannel  chan *Task //协程池内部的任务就绪队列
}

//创建一个协程池
func NewPool(cap int) *Pool {
	p := Pool{
		EntryChannel: make(chan *Task),
		worker_num:   cap,
		JobsChannel:  make(chan *Task),
	}
	return &p
}

//协程池创建一个worker并且开始工作
func (p *Pool) worker(work_ID int) {
	//worker不断的从JobsChannel内部任务队列中拿任务
	for task := range p.JobsChannel {
		//如果拿到任务,则执行task任务
		task.Execute()
		fmt.Println("worker ID ", work_ID, " 执行完毕任务")
	}
}

//让协程池Pool开始工作
func (p *Pool) Run() {
	//1,首先根据协程池的worker数量限定,开启固定数量的Worker,
	//  每一个Worker用一个Goroutine承载
	for i := 0; i < p.worker_num; i++ {
		fmt.Println("开启固定数量的Worker:", i)
		go p.worker(i)
	}

	//2, 从EntryChannel协程池入口取外界传递过来的任务
	//   并且将任务送进JobsChannel中
	for task := range p.EntryChannel {
		p.JobsChannel <- task
	}

	//3, 执行完毕需要关闭JobsChannel
	close(p.JobsChannel)
	fmt.Println("执行完毕需要关闭JobsChannel")

	//4, 执行完毕需要关闭EntryChannel
	close(p.EntryChannel)
	fmt.Println("执行完毕需要关闭EntryChannel")
}

//主函数
func main() {
	//创建一个Task
	t := NewTask(func() error {
		fmt.Println("创建一个Task:", time.Now().Format("2006-01-02 15:04:05"))
		return nil
	})

	//创建一个协程池,最大开启3个协程worker
	p := NewPool(3)

	//开一个协程 不断的向 Pool 输送打印一条时间的task任务
	go func() {
		for {
			p.EntryChannel <- t
		}
	}()

	//启动协程池p
	p.Run()
}

3. go-playground/pool

上面的协程池虽然简单,但是对于每一个并发任务的状态,pool的状态缺少控制,我们可以看看go-playground/pool的源码实现,“源码面前,如同裸奔”。先从每一个需要执行的任务入手,该库中对并发单元做了如下的结构体,可以看到除工作单元的值,错误,执行函数等,还用了三个分别表示,取消,取消中,写 的三个并发安全的原子操作值来标识其运行状态。

依赖包下载: go get "gopkg.in/go-playground/pool.v3"

package main

import (
	"fmt"
	"gopkg.in/go-playground/pool.v3"
	"time"
)

func SendMail(int int) pool.WorkFunc {
	fn := func(wu pool.WorkUnit) (interface{}, error) {
		// sleep 1s 模拟发邮件过程
		time.Sleep(time.Second * 1)
		// 模拟异常任务需要取消
		if int == 17 {
			wu.Cancel()
		}
		if wu.IsCancelled() {
			return false, nil
		}
		fmt.Println("send to", int)
		return true, nil
	}
	return fn
}

func main() {
	// 初始化groutine数量为20的pool
	p := pool.NewLimited(20)
	defer p.Close()
	batch := p.Batch()
	// 设置一个批量任务的过期超时时间
	t := time.After(10 * time.Second)
	go func() {
		for i := 0; i < 100; i++ {
			batch.Queue(SendMail(i)) // 往批量任务中添加workFunc任务
		}
		// 通知批量任务不再接受新的workFunc, 如果添加完workFunc不执行改方法的话将导致取结果集时done channel一直阻塞
		batch.QueueComplete()
	}()
	// // 获取批量任务结果集, 因为 batch.Results 中要close results channel 所以不能将其放在LOOP中执行
	r := batch.Results()
LOOP:
	for {
		select {
		case <-t:
			// 超时通知
			fmt.Println("超时通知")
			break LOOP
		case email, ok := <-r:
			// 读取结果集
			if ok {
				if err := email.Error(); err != nil {
					fmt.Println("读取结果集错误,error info:", err.Error())
				}
				fmt.Println("错误结果集:", email.Value())
			} else {
				fmt.Println("finish")
				break LOOP
			}
		}
	}
}

go-playground/pool相比简单的协程池, 对pool, worker的状态有了很好的管理。但是在第一个实现的简单groutine池和go-playground/pool中,都是先启动预定好的groutine来完成任务执行,在并发量远小于任务量的情况下确实能够做到groutine的复用,如果任务量不多则会导致任务分配到每个groutine不均匀,甚至可能出现启动的groutine根本不会执行任务从而导致浪费,而且对于协程池也没有动态的扩容和缩小。接下来了解下ants的设计和实现。

4. ants(推荐)

ants是一个受fasthttp启发的高性能协程池,fasthttp号称是比go原生的net/http快10倍,其原因之一就是采用了各种池化技术, ants相比之前两种协程池,其模型更像是之前接触到的数据库连接池,需要从空余的worker中取出一个来执行任务, 当无可用空余worker的时候再去创建,而当pool的容量达到上线之后,剩余的任务阻塞等待当前进行中的worker执行完毕将worker放回pool, 直至pool中有空闲worker。 ants在内存的管理上做得很好,除了定期清除过期worker(一定时间内没有分配到任务的worker),ants还实现了一种适用于大批量相同任务的pool, 这种pool与一个需要大批量重复执行的函数锁绑定,避免了调用方不停的创建,更加节省内存。

 

package main

import (
	"fmt"
	"github.com/panjf2000/ants"
	"sync"
	"time"
)

//任务
func sendMail(i int, wg *sync.WaitGroup) func() {
	var cnt int
	return func() {
		for {
			time.Sleep(time.Second * 2)
			fmt.Println("send mail to ", i)
			cnt++
			if cnt > 5 && i == 1 {
				fmt.Println("退出协程ID:", i)
				break
			}
		}
		wg.Done()
	}
}

func main() {
	wg := sync.WaitGroup{}

	//申请一个协程池对象
	pool, _ := ants.NewPool(2)

	//关闭协程池
	defer pool.Release()

	// 向pool提交任务
	for i := 1; i <= 5; i++ {
		pool.Submit(sendMail(i, &wg))
		wg.Add(1)
	}
	wg.Wait()
}

源码中提到, ants的吞吐量能够比原生groutine高出N倍,内存节省10到20倍。

源码参考: