尽管Goroutine(协程)非常清轻量,但是本身也是占用资源的,过多协程切换也会带来开销,总之物极必反,无限制的开协程的结果只会是Game Over。生产实践中必须考虑控制协程数量,本文带你看看针对不同场景和需求的协程数量控制方式,看看这些姿势你都会了吗?

场景

如下,go中一个典型场景是,接受数据然后开协程处理,代码如下

// runTaskDataGenerator 产生数据
func runTaskDataGenerator(dataChan chan int) {
	for i := 0; i < 100; i++ {
		dataChan <- i
	}

	close(dataChan)
}

// runInfiniteTask 每来一个数据起协程处理任务
func runInfiniteTask(dataChan <-chan int) {
	var wg sync.WaitGroup

	for data := range dataChan {
		wg.Add(1)
		go func(data int) {
			defer wg.Done()

			// do something
			time.Sleep(3 * time.Second)
		}(data)
	}

	wg.Wait()
}

func TestRunInfiniteTask(t *testing.T) {
	dataChan := make(chan int)

	go runTaskDataGenerator(dataChan)
	go runNumGoroutineMonitor()

	runInfiniteTask(dataChan)
}

runNumGoroutineMonitor监控协程数量

// runNumGoroutineMonitor 协程数量监控
func runNumGoroutineMonitor() {
	log.Printf("协程数量->%d\n", runtime.NumGoroutine())

	for {
		select {
		case <-time.After(time.Second):
			log.Printf("协程数量->%d\n", runtime.NumGoroutine())
		}
	}
}

运行结果如下,可以看到**有多少数据就会起多少协程,如果任务处理时间较长,短时间可能出现大量协程**,耗尽资源,需要控制协程数量。

=== RUN TestRunInfiniteTask
2023/01/05 10:36:12 协程数量->6
2023/01/05 10:36:13 协程数量->105
2023/01/05 10:36:14 协程数量->105
— PASS: TestRunInfiniteTask (3.00s)

固定个数协程并发处理任务

一般叫做Bounded/Fixed并发控制。

  • 优点是简单,不复杂的并发任务这样简单处理即可。
  • 缺点在于dataChan可能流量不不均衡,需要同时处理的任务多少在变动,但是对应的协程数量保持不变,要不就是任务处理堵塞要不就是存在多余的协程空闲
// runBoundedTask 起maxTaskNum个协程共同处理任务
func runBoundedTask(dataChan <-chan int, maxTaskNum int) {
	var wg sync.WaitGroup
	wg.Add(maxTaskNum)

	for i := 0; i < maxTaskNum; i++ {
		go func() {
			defer wg.Done()

			for data := range dataChan {
				func(data int) {

					// do something
					time.Sleep(3 * time.Second)
				}(data)
			}
		}()
	}

	wg.Wait()
}
动态个数协程并发处理任务

针对固定个数协程的缺点,一个思路是协程数量最好能够根据来的处理任务的多少,动态变更,指定一个并发上限,任务多时增加协程数量,任务少时减少协程数量。这里提供两种实现思路

自定义令牌池实现

令牌池维持最大允许并发任务数个令牌,每个任务启动时请求令牌,运行完成返回令牌。

// runDynamicTask 
// 最大同时运行maxTaskNum个任务处理数据
// 自定义令牌池维持maxTaskNum个令牌供竞争
func runDynamicTask(dataChan <-chan int, maxTaskNum int) {
	// 初始化令牌池
	tokenPool := make(chan struct{}, maxTaskNum)
	for i := 0; i < maxTaskNum; i++ {
		tokenPool <- struct{}{}
	}

	var wg sync.WaitGroup

	for data := range dataChan {
		// 先获取令牌,如果被消费完则阻塞等待其它任务返还令牌
		<-tokenPool

		wg.Add(1)
		go func(data int) {
			defer wg.Done()

			// 任务运行完成,返还令牌
			defer func() {
				tokenPool <- struct{}{}
			}()

			// do something
			time.Sleep(3 * time.Second)
		}(data)
	}

	wg.Wait()
}

信号量Semaphore实现

同上,令牌池换成信号量。

// runSemaphoreTask 
// 最大同时运行maxTaskNum个任务处理数据
// 使用信号量维持maxTaskNum个信号
func runSemaphoreTask(dataChan <-chan int, maxTaskNum int64) {
	w := semaphore.NewWeighted(maxTaskNum)

	var wg sync.WaitGroup

	for data := range dataChan {
		// 先获取信号量,如果被消费完则阻塞等待信号量返还
		_ = w.Acquire(context.TODO(), 1)

		wg.Add(1)
		go func(data int) {
			defer wg.Done()

			// 运行完成返还信号量
			defer w.Release(1)

			// do something
			time.Sleep(3 * time.Second)
		}(data)
	}

	wg.Wait()
}
指定处理速度并发处理任务

针对固定个数协程的缺点,另一个思路是借鉴限流器的实现,控制每个时刻最大允许协程数量也达到控制协程数量的目的。这里也提供两种实现思路

自定义令牌池实现

相当于一个简单限流器,指定速度生产令牌,每个任务启动时必须请求到令牌。

// runRateLimitTask 限制每秒允许的最大协程数量,限流器的思路
func runRateLimitTask(dataChan <-chan int) {
	// 初始化令牌池
	tokenPool := make(chan struct{})
	go func() {
		for {
			select {
			// 动态控制令牌生成速度
			case <-time.After(time.Second):
				tokenPool <- struct{}{}
			}
		}
	}()

	var wg sync.WaitGroup

	for data := range dataChan {
		// 先获取令牌,如果被消费完则阻塞等待新令牌产生
		<-tokenPool

		wg.Add(1)
		go func(data int) {
			defer wg.Done()

			// do something
			time.Sleep(3 * time.Second)
		}(data)
	}

	wg.Wait()
}

官方限流器实现

逻辑同上,每个任务启动必须先获取令牌。

// runRateLimitTask2 限制每秒允许的最大协程数量,使用官方限流器
func runRateLimitTask2(dataChan <-chan int) {
	// 初始化令牌池
	limit := rate.Every(time.Second) // 每秒一个
	limiter := rate.NewLimiter(limit, 10)

	var wg sync.WaitGroup

	for data := range dataChan {
		// 先获取令牌,如果被消费完则阻塞等待新令牌产生
		_ = limiter.Wait(context.TODO())

		wg.Add(1)
		go func(data int) {
			defer wg.Done()

			// do something
			time.Sleep(3 * time.Second)
		}(data)
	}

	wg.Wait()
}
协程池并发处理任务

生产业务中,针对复杂业务或者不想那么麻烦,可以直接上协程池。
常用协程池https://github.com/panjf2000/ants,如下实现。

代码上看起来简洁很多,根本原理和动态个数协程控制思路差不多,后续单开一篇文章讲讲协程池的实现。

// runGoroutinePoolTask 使用协程池动态管理协程数量
func runGoroutinePoolTask(dataChan <-chan int, maxTaskNum int) {
	p, _ := ants.NewPool(maxTaskNum)
	defer p.Release()

	var wg sync.WaitGroup

	for _ = range dataChan {
		wg.Add(1)

		// 提交任务,协程池动态管理数量,可以做更多的分配优化策略
		_ = p.Submit(func() {
			defer wg.Done()

			// do something
			time.Sleep(3 * time.Second)
		})

	}

	wg.Wait()
}
参考

演示代码 https://gitee.com/wenzhou1219/go-in-prod/tree/master/task_concurrency

https://zhuanlan.zhihu.com/p/568151296