目录

一. 利用 channel 的缓存区控制协程并发数量

  1. 查询一个问题代码,内部实现了math.MaxInt32 个协程的并发,系统的资源可能会被耗尽出现异常
func main() {
	var wg sync.WaitGroup
	for i := 0; i < math.MaxInt32; i++ {
		wg.Add(1)
		go func(i int) {
			defer wg.Done()
			fmt.Println(i)
			time.Sleep(time.Second)
		}(i)
	}
	wg.Wait()
}
  1. 利用 channel 的缓存区控制协程并发数量
  1. make(chan struct{}, 3) 创建缓冲区大小为 3 的 channel,在没有被接收的情况下,至多发送 3 个消息则被阻塞。
  2. 开启协程前,调用 ch <- struct{}{},若缓存区满,则阻塞。
  3. 协程任务结束,调用 <-ch 释放缓冲区。
  4. sync.WaitGroup 并不是必须的,例如 http 服务,每个请求天然是并发的,此时使用 channel 控制并发处理的任务数量,就不需要 sync.WaitGroup
func main() {
	var wg sync.WaitGroup
	ch := make(chan struct{}, 3)
	for i := 0; i < 10; i++ {
		ch <- struct{}{}
		wg.Add(1)
		go func(i int) {
			defer wg.Done()
			log.Println(i)
			time.Sleep(time.Second)
			<-ch
		}(i)
	}
	wg.Wait()
}

二. 利用第三方库

  1. 目前有很多第三方库实现了协程池,可以很方便地用来控制协程的并发数量,比较受欢迎的有:
  1. Jeffail/tunny
  2. panjf2000/ants
  1. tunny 使用示例:
  1. tunny.NewFunc(3, f) 第一个参数是协程池的大小(poolSize),第二个参数是协程运行的函数(worker)。
  2. pool.Process(i) 将参数 i 传递给协程池定义好的 worker 处理。
  3. pool.Close() 关闭协程池
import (
	"log"
	"time"

	"github.com/Jeffail/tunny"
)

func main() {
	pool := tunny.NewFunc(3, func(i interface{}) interface{} {
		log.Println(i)
		time.Sleep(time.Second)
		return nil
	})
	defer pool.Close()

	for i := 0; i < 10; i++ {
		go pool.Process(i)
	}
	time.Sleep(time.Second * 4)
}