尽管Goroutine(协程)非常清轻量,但是本身也是占用资源的,过多协程切换也会带来开销,总之物极必反,无限制的开协程的结果只会是Game Over。生产实践中必须考虑控制协程数量,本文带你看看针对不同场景和需求的协程数量控制方式,看看这些姿势你都会了吗?
场景如下,go中一个典型场景是,接受数据然后开协程处理,代码如下
// runTaskDataGenerator 产生数据
func runTaskDataGenerator(dataChan chan int) {
for i := 0; i < 100; i++ {
dataChan <- i
}
close(dataChan)
}
// runInfiniteTask 每来一个数据起协程处理任务
func runInfiniteTask(dataChan <-chan int) {
var wg sync.WaitGroup
for data := range dataChan {
wg.Add(1)
go func(data int) {
defer wg.Done()
// do something
time.Sleep(3 * time.Second)
}(data)
}
wg.Wait()
}
func TestRunInfiniteTask(t *testing.T) {
dataChan := make(chan int)
go runTaskDataGenerator(dataChan)
go runNumGoroutineMonitor()
runInfiniteTask(dataChan)
}
runNumGoroutineMonitor监控协程数量
// runNumGoroutineMonitor 协程数量监控
func runNumGoroutineMonitor() {
log.Printf("协程数量->%d\n", runtime.NumGoroutine())
for {
select {
case <-time.After(time.Second):
log.Printf("协程数量->%d\n", runtime.NumGoroutine())
}
}
}
运行结果如下,可以看到**有多少数据就会起多少协程,如果任务处理时间较长,短时间可能出现大量协程**,耗尽资源,需要控制协程数量。
固定个数协程并发处理任务=== RUN TestRunInfiniteTask
2023/01/05 10:36:12 协程数量->6
2023/01/05 10:36:13 协程数量->105
2023/01/05 10:36:14 协程数量->105
— PASS: TestRunInfiniteTask (3.00s)
一般叫做Bounded/Fixed并发控制。
- 优点是简单,不复杂的并发任务这样简单处理即可。
- 缺点在于dataChan可能流量不不均衡,需要同时处理的任务多少在变动,但是对应的协程数量保持不变,要不就是任务处理堵塞要不就是存在多余的协程空闲。
// runBoundedTask 起maxTaskNum个协程共同处理任务
func runBoundedTask(dataChan <-chan int, maxTaskNum int) {
var wg sync.WaitGroup
wg.Add(maxTaskNum)
for i := 0; i < maxTaskNum; i++ {
go func() {
defer wg.Done()
for data := range dataChan {
func(data int) {
// do something
time.Sleep(3 * time.Second)
}(data)
}
}()
}
wg.Wait()
}
动态个数协程并发处理任务
针对固定个数协程的缺点,一个思路是协程数量最好能够根据来的处理任务的多少,动态变更,指定一个并发上限,任务多时增加协程数量,任务少时减少协程数量。这里提供两种实现思路
自定义令牌池实现
令牌池维持最大允许并发任务数个令牌,每个任务启动时请求令牌,运行完成返回令牌。
// runDynamicTask
// 最大同时运行maxTaskNum个任务处理数据
// 自定义令牌池维持maxTaskNum个令牌供竞争
func runDynamicTask(dataChan <-chan int, maxTaskNum int) {
// 初始化令牌池
tokenPool := make(chan struct{}, maxTaskNum)
for i := 0; i < maxTaskNum; i++ {
tokenPool <- struct{}{}
}
var wg sync.WaitGroup
for data := range dataChan {
// 先获取令牌,如果被消费完则阻塞等待其它任务返还令牌
<-tokenPool
wg.Add(1)
go func(data int) {
defer wg.Done()
// 任务运行完成,返还令牌
defer func() {
tokenPool <- struct{}{}
}()
// do something
time.Sleep(3 * time.Second)
}(data)
}
wg.Wait()
}
信号量Semaphore实现
同上,令牌池换成信号量。
// runSemaphoreTask
// 最大同时运行maxTaskNum个任务处理数据
// 使用信号量维持maxTaskNum个信号
func runSemaphoreTask(dataChan <-chan int, maxTaskNum int64) {
w := semaphore.NewWeighted(maxTaskNum)
var wg sync.WaitGroup
for data := range dataChan {
// 先获取信号量,如果被消费完则阻塞等待信号量返还
_ = w.Acquire(context.TODO(), 1)
wg.Add(1)
go func(data int) {
defer wg.Done()
// 运行完成返还信号量
defer w.Release(1)
// do something
time.Sleep(3 * time.Second)
}(data)
}
wg.Wait()
}
指定处理速度并发处理任务
针对固定个数协程的缺点,另一个思路是借鉴限流器的实现,控制每个时刻最大允许协程数量也达到控制协程数量的目的。这里也提供两种实现思路
自定义令牌池实现
相当于一个简单限流器,指定速度生产令牌,每个任务启动时必须请求到令牌。
// runRateLimitTask 限制每秒允许的最大协程数量,限流器的思路
func runRateLimitTask(dataChan <-chan int) {
// 初始化令牌池
tokenPool := make(chan struct{})
go func() {
for {
select {
// 动态控制令牌生成速度
case <-time.After(time.Second):
tokenPool <- struct{}{}
}
}
}()
var wg sync.WaitGroup
for data := range dataChan {
// 先获取令牌,如果被消费完则阻塞等待新令牌产生
<-tokenPool
wg.Add(1)
go func(data int) {
defer wg.Done()
// do something
time.Sleep(3 * time.Second)
}(data)
}
wg.Wait()
}
官方限流器实现
逻辑同上,每个任务启动必须先获取令牌。
// runRateLimitTask2 限制每秒允许的最大协程数量,使用官方限流器
func runRateLimitTask2(dataChan <-chan int) {
// 初始化令牌池
limit := rate.Every(time.Second) // 每秒一个
limiter := rate.NewLimiter(limit, 10)
var wg sync.WaitGroup
for data := range dataChan {
// 先获取令牌,如果被消费完则阻塞等待新令牌产生
_ = limiter.Wait(context.TODO())
wg.Add(1)
go func(data int) {
defer wg.Done()
// do something
time.Sleep(3 * time.Second)
}(data)
}
wg.Wait()
}
协程池并发处理任务
生产业务中,针对复杂业务或者不想那么麻烦,可以直接上协程池。
常用协程池https://github.com/panjf2000/ants,如下实现。
代码上看起来简洁很多,根本原理和动态个数协程控制思路差不多,后续单开一篇文章讲讲协程池的实现。
// runGoroutinePoolTask 使用协程池动态管理协程数量
func runGoroutinePoolTask(dataChan <-chan int, maxTaskNum int) {
p, _ := ants.NewPool(maxTaskNum)
defer p.Release()
var wg sync.WaitGroup
for _ = range dataChan {
wg.Add(1)
// 提交任务,协程池动态管理数量,可以做更多的分配优化策略
_ = p.Submit(func() {
defer wg.Done()
// do something
time.Sleep(3 * time.Second)
})
}
wg.Wait()
}
参考
演示代码 https://gitee.com/wenzhou1219/go-in-prod/tree/master/task_concurrency
https://zhuanlan.zhihu.com/p/568151296