sync.WaitGroupCountDownLatch

下面的例子,我们来编写一个并行任务,以并发的方式处理一个批次的任务,可以控制它的最大并发度,并在所有任务全部完成后返回一个结果。

import (
	"fmt"
	"sync"
	"time"
	"unicode/utf8"
)

func concurrentPrinting(tasks []string, maxConcurrent int) int {
	tokens := make(chan struct{}, maxConcurrent)
	results := make(chan int, len(tasks))
	var wg sync.WaitGroup
	wg.Add(len(tasks))

	for _, t := range tasks {
		tokens <- struct{}{}
		go func(str string) {
			time.Sleep(time.Second * 2)
			fmt.Println(str)
			results <- utf8.RuneCountInString(str)
			<-tokens
			wg.Done()
		}(t)
	}

	wg.Wait()
	close(results)
	var totalCount int
	for count := range results {
		totalCount += count
	}
	return totalCount
}

func main() {
	tasks := []string{"hello", "world", "golang", "foo", "bar", "1024", "2048"}
	totalCount := concurrentPrinting(tasks, 2)
	fmt.Printf("print %d chars", totalCount)
}
sync.WaitGroup

注意for循环中的运行在单独的goroutine中的匿名函数,如果在函数体中直接引用循环变量的话,拿到的可能全部都是最后一个变量,而作为函数的参数传递的则是正确地值。

WaitGroup
  1. wg.Add()
  2. go启动新的goroutine
  3. 在goroutine中完成任务时调用wg.Done()
  4. wg.Wait()等待所有goroutine运行结束

每一个并行任务都运行在独立的goroutine中,wg.Wait()也运行在独立的goroutine中,而主goroutine中range了一个channel,当这个channel关闭时,range结束。

tokens := make(chan struct{}, maxConcurrent)

整个逻辑的思路分析:

-1