目录
一. 利用 channel 的缓存区控制协程并发数量
- 查询一个问题代码,内部实现了math.MaxInt32 个协程的并发,系统的资源可能会被耗尽出现异常
func main() {
var wg sync.WaitGroup
for i := 0; i < math.MaxInt32; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
fmt.Println(i)
time.Sleep(time.Second)
}(i)
}
wg.Wait()
}
- 利用 channel 的缓存区控制协程并发数量
- make(chan struct{}, 3) 创建缓冲区大小为 3 的 channel,在没有被接收的情况下,至多发送 3 个消息则被阻塞。
- 开启协程前,调用 ch <- struct{}{},若缓存区满,则阻塞。
- 协程任务结束,调用 <-ch 释放缓冲区。
- sync.WaitGroup 并不是必须的,例如 http 服务,每个请求天然是并发的,此时使用 channel 控制并发处理的任务数量,就不需要 sync.WaitGroup
func main() {
var wg sync.WaitGroup
ch := make(chan struct{}, 3)
for i := 0; i < 10; i++ {
ch <- struct{}{}
wg.Add(1)
go func(i int) {
defer wg.Done()
log.Println(i)
time.Sleep(time.Second)
<-ch
}(i)
}
wg.Wait()
}
二. 利用第三方库
- 目前有很多第三方库实现了协程池,可以很方便地用来控制协程的并发数量,比较受欢迎的有:
- Jeffail/tunny
- panjf2000/ants
- tunny 使用示例:
- tunny.NewFunc(3, f) 第一个参数是协程池的大小(poolSize),第二个参数是协程运行的函数(worker)。
- pool.Process(i) 将参数 i 传递给协程池定义好的 worker 处理。
- pool.Close() 关闭协程池
import (
"log"
"time"
"github.com/Jeffail/tunny"
)
func main() {
pool := tunny.NewFunc(3, func(i interface{}) interface{} {
log.Println(i)
time.Sleep(time.Second)
return nil
})
defer pool.Close()
for i := 0; i < 10; i++ {
go pool.Process(i)
}
time.Sleep(time.Second * 4)
}