sync.WaitGroupCountDownLatch
下面的例子,我们来编写一个并行任务,以并发的方式处理一个批次的任务,可以控制它的最大并发度,并在所有任务全部完成后返回一个结果。
import (
"fmt"
"sync"
"time"
"unicode/utf8"
)
func concurrentPrinting(tasks []string, maxConcurrent int) int {
tokens := make(chan struct{}, maxConcurrent)
results := make(chan int, len(tasks))
var wg sync.WaitGroup
wg.Add(len(tasks))
for _, t := range tasks {
tokens <- struct{}{}
go func(str string) {
time.Sleep(time.Second * 2)
fmt.Println(str)
results <- utf8.RuneCountInString(str)
<-tokens
wg.Done()
}(t)
}
wg.Wait()
close(results)
var totalCount int
for count := range results {
totalCount += count
}
return totalCount
}
func main() {
tasks := []string{"hello", "world", "golang", "foo", "bar", "1024", "2048"}
totalCount := concurrentPrinting(tasks, 2)
fmt.Printf("print %d chars", totalCount)
}
sync.WaitGroup
注意for循环中的运行在单独的goroutine中的匿名函数,如果在函数体中直接引用循环变量的话,拿到的可能全部都是最后一个变量,而作为函数的参数传递的则是正确地值。
WaitGroup
- wg.Add()
- go启动新的goroutine
- 在goroutine中完成任务时调用wg.Done()
- wg.Wait()等待所有goroutine运行结束
每一个并行任务都运行在独立的goroutine中,wg.Wait()也运行在独立的goroutine中,而主goroutine中range了一个channel,当这个channel关闭时,range结束。
tokens := make(chan struct{}, maxConcurrent)
整个逻辑的思路分析:
-1