试想这样一种情况,如果给每个客户端都分配一个 session 来维护连接,然后每个 session 会启动3个 goroutine,一个用来读取消息,一个用来发送消息,一个用来检测生命周期。那么,当大量客户端连接时,那 goroutine 数量就是3倍客户端的数量啊。即使 goroutine 再轻量,几百万的客户都涌上来的时候,内存也绷不住啊。
golang 号称可以百万级别并发,但 goroutine 也不应该无限制的创建吧,毕竟每次都向系统申请内存,系统内存总有耗尽的一天吧。那有没有一个池一样的东西,可以让 goroutine 可以重用,而不需要不节制的创建呢?
也就是说,有什么方案可以减缓大规模 Goroutine 对系统的调度和内存压力?要想解决问题,最重要的是找到造成问题的根源,这个问题根源是什么?Goroutine 的数量过多导致资源侵占,那要解决这个问题就要限制运行的 Goroutine 数量,合理复用,节省资源,具体就是 — Goroutine池化。
这就诞生了 Goroutine 池的概念。Goroutine 池不是 golang 官方给出的一个概念,而是程序员在实践中发现存在上述问题而给出的一种用于解决使用 goroutine 时所产生的实际问题的思路。
试想,Goroutine 池中预先保存一定数量的 Goroutine ,而新任务将不再以创建新 Goroutine 的方式去执行,而是将任务发布到任务队列,Goroutine 池中的 Goroutine 不断的从任务队列中取出任务并执行,可以有效的减少 Goroutine 创建和销毁所带来的开销。
总结一下为什么要实现 Goroutine 池:
- 即便每个goroutine只分配4KB的内存,但如果是恐怖如斯的数量,聚少成多,内存会占用过高。
- 会对GC造成极大的负担,首先GC会在回收 goroutine 上消耗性能,其次GC本身也是 goroutine ,内存吃紧的状态下连GC的调度都会出现问题。
- 提高响应速度,减少创建协程的时间。
- 更好的管理协程,控制最大并发数量,定期回收。
- Goroutine 池是一个池子,里面有一些 Goroutine 。
- 这个池子有一个最大容量,其内部的 Goroutine 数量不能超过其最大容量。
- 可以将池子中的每个 Goroutine 看作是一个 worker ,用于执行任务。
更准确的说,Goroutine 池是一个架构。该架构由两部分组成:
- 一个池子,里面有一些 Goroutine 。
- 一个任务队列,里面放着给池子里的 Goroutine 执行的任务。
新来了一个任务,如果池子存满了 Goroutine ,而且它们都在工作,那么就将该任务放入任务队列,等待被处理;
如果池子没满,就新开一个 Goroutine 去处理该任务。
1. Goroutine 池只是一个抽象的概念
Golang 没有封装好的线程池。
Goroutine 池只是一个概念,需要我们自己写代码时有意识地实现 Goroutine 池。
2. Goroutine 池的设计思路
- 启动服务的时候初始化一个 Goroutine Pool,这个协程池维护了 任务的管道 和 worker(也就是 Goroutine)。
- 外部将请求投递到 Goroutine Pool,Goroutine Pool 的操作是:判断当前运行的 worker 是否已经超过 Pool 的容量,如果超过就将请求放到任务管道中直到运行的 worker 将管道中的任务执行;如果没有超过就新开一个 worker 处理。
3. 生产者消费者模型
在这个 投递 —> 等待 —> 执行 的过程中,我们很容易想到生产者消费者模型:
生产者 --(生产任务)–> 队列 --(消费任务)–> 消费者
实际上,用来执行任务的 goroutine 就是消费者,操作任务池的 goroutine 就是生产者, 而队列则可以使用 go 的 buffer channel,至此,任务池的建模到此结束。
四、一个实现 Goroutine 池的实例题目:
- 计算一个数字的各个位数之和,例如数字123,结果为1+2+3=6。
- 随机生成数字进行计算。
package main
import (
"fmt"
"math/rand"
)
type Job struct { //任务
// id
Id int
// 需要计算的随机数
RandNum int
}
type Result struct { //结果
// 这里必须传对象实例
job *Job
// 求和
sum int
}
func main() {
// 需要2个管道
// 任务管道
jobChan := make(chan *Job, 128)
// 结果管道
resultChan := make(chan *Result, 128)
// 工作池(goroutine池)
createPool(64, jobChan, resultChan) //工作池中有64个Goroutine在工作
// 负责打印的协程
go func(resultChan chan *Result) {
// 遍历结果管道,进行打印
for result := range resultChan { //从通道resultChan接收值
fmt.Printf("job id:%v randnum:%v result:%d\n", result.job.Id,
result.job.RandNum, result.sum)
}
}(resultChan) //给函数传入参数,立即执行
//主协程
var id int
// 循环创建job,输入到管道
for {
id++
// 生成随机数
r_num := rand.Int()
job := &Job{
Id: id,
RandNum: r_num,
}
jobChan <- job
}
}
// 函数createPool:创建工作池
// 工作池里的Goroutine负责计算从jobChan取数字,然后计算各位之和,再输出到resultChan
func createPool(num int, jobChan chan *Job, resultChan chan *Result) {
// 开 num 个协程,做计算工作
for i := 0; i < num; i++ {
go func(jobChan chan *Job, resultChan chan *Result) {
// 执行运算
// 遍历job管道所有数据,进行相加
for job := range jobChan {
// 随机数接过来
r_num := job.RandNum //读取随机数
// 随机数每一位相加
// 定义返回值
var sum int
for r_num != 0 {
tmp := r_num % 10
sum += tmp
r_num /= 10
}
// 想要的结果是Result
r := &Result{
job: job,
sum: sum,
}
//运算结果扔到管道
resultChan <- r
}
}(jobChan, resultChan)
}
}
输出结果的冰山一角:
job id:62596 randnum:3542998448878732054 result:100
job id:62597 randnum:1412622303680101805 result:53
job id:62598 randnum:2405699357934002636 result:83
job id:62599 randnum:729922549175030513 result:74
job id:62600 randnum:8887708274878274993 result:116
job id:62601 randnum:5826041766204926306 result:77
job id:62602 randnum:5892735228296506585 result:97
job id:62603 randnum:5715834663741984020 result:83
job id:62604 randnum:3141293823040601058 result:60
job id:62605 randnum:8841672571116882462 result:87
job id:62606 randnum:3482484307097615774 result:89
job id:62607 randnum:8196261471617693666 result:95
job id:62608 randnum:8692192048687145566 result:97
job id:62609 randnum:2639146648002715963 result:82
job id:62610 randnum:3323717745941790047 result:83
job id:62611 randnum:7567210708242192793 result:82
解释:
如上图,本程序由两个协程、一个协程池、两个管道构成。
job 协程不断产生任务,然后放入 jobchan 任务队列;协程池中有64个协程,不断地从任务队列 jobchan 中取任务来执行,将结果放入 resultchan 任务队列;printf 协程从 resultchan 任务队列中取出结果来进行打印。
其中,我们可以看到两对生产者消费者:job 协程和协程池是一对生产者消费者,协程池和printf 协程也是一对生产者消费者。
说明:
在这个例子中,两个管道和协程池可以理解为 “Goroutine 池”。
它的作用是一直在用有限的 Goroutine 数量进行一项无限的工作。
试想,如果没有这两个管道和协程池:
job 协程一直在产生任务,来一个任务就要创建一个协程计算它…那将是无数个协程…
项目地址:ants
六、总结worker pool(goroutine池):
- 本质上是生产者消费者模型
- 可以有效控制 goroutine 数量,防止暴涨