背景
主要记录一下工作中和各个文档里关于golang并发开发的实践。golang并发主要用到了
Channel: 使用channel控制子协程,WaitGroup : 使用信号量机制控制子协程,Context: 使用上下文控制子协程。使用这三种机制中的一种或者多种可以达到并发控制很好的效果。关于这三个知识点,https://blog.csdn.net/LINZEYU666/article/details/123020597介绍的比较详细了,这里只介绍几个场景用法。
实践
waitGroup并发执行不同任务
这种写法适合并发处理少量case,并且funcA和funcB的作用是不同任务的时候。
wg := sync.WaitGroup{}
var result1 interface{}
var result2 interface{}
wg.Add(2)
go func() {
defer func() {
wg.Done()
}()
result1 = funcA()
}()
go func() {
defer func() {
wg.Done()
}()
result1 = funcB()
}()
wg.wait()
waitGroup并发执行相同任务
假设有一批url,需要并发去抓取,这个时候可能只是请求的地址不同,任务的函数是一致的。这时候可以使用for循环的方式去批量执行。使用该方法时候,需要使用线程安全的结构去做数据同步。此外下文的写法最大的弊端是没法做并发度控制,如果请求过多,容易把下游打满,以及启过多协程,浪费资源,只适合小数据集,
import (
"fmt"
"io/ioutil"
"net/http"
"strings"
"sync"
)
func main() {
idList := []string{"x", "xx"}
wg := sync.WaitGroup{}
dataMap := sync.Map{} // sync.Map是线程安全的,如果使用 map去存储返回结果会报错,
// 接受返回结果也可以是channel,channel也是线程安全的
for _, id := range idList {
wg.Add(1)
go func(id string) {
defer func() {
wg.Done()
}()
data := PostData(id)
dataMap.Store(id, data)
}(id)
}
wg.Wait()
for _, id := range idList {
fmt.Println(dataMap.Load(id))
}
}
func PostData(id string) string {
url := "http:xx"
method := "POST"
payload := strings.NewReader(fmt.Sprintf(`{"id":"%s"}`, id))
client := &http.Client{}
req, err := http.NewRequest(method, url, payload)
if err != nil {
fmt.Println(err)
return ""
}
req.Header.Add("Content-Type", "application/json")
res, err := client.Do(req)
if err != nil {
fmt.Println(err)
return ""
}
defer res.Body.Close()
body, err := ioutil.ReadAll(res.Body)
if err != nil {
fmt.Println(err)
return ""
}
return string(body)
}
加入channnel,控制并发度
package main
import (
"fmt"
"net/http"
"sync"
)
func main() {
urls := []string{"http://a.com", "http://b.com", "http://c.com"}
// 控制并发度为2
concurrency := 2
sem := make(chan struct{}, concurrency)
var wg sync.WaitGroup
for _, url := range urls {
wg.Add(1)
go func(url string) {
sem <- struct{}{} // 获取信号量
defer func() {
<-sem // 释放信号量
wg.Done()
}()
resp, err := http.Get(url)
if err != nil {
fmt.Printf("Error fetching %s: %v\n", url, err)
return
}
defer resp.Body.Close()
fmt.Printf("Fetched %s with status code %d\n", url, resp.StatusCode)
}(url)
}
wg.Wait()
fmt.Println("All URLs fetched")
}
使用channel进行并发编程
接下来,我们使用一个for循环来遍历URL切片,并为每个URL启动一个goroutine。在每个goroutine中,我们首先从并发度通道中获取一个信号,表示可以开始请求。然后,我们发送一个http GET请求,并将响应结果发送到结果通道中。最后,我们释放一个信号,表示请求已完成。
在主函数中,我们使用另一个for循环从结果通道中读取所有响应结果,并将它们打印到控制台上。
需要注意的是,我们在并发度通道中使用了一个空结构体{},因为我们只需要通道来控制并发度,而不需要在通道中传递任何数据。此外,我们还使用了通道的阻塞特性来控制并发度,因为当通道已满时,任何试图向通道中发送数据的操作都会被阻塞,直到有空间可用为止。
package main
import (
"fmt"
"net/http"
)
func main() {
urls := []string{"http://www.google.com", "http://www.facebook.com", "http://www.apple.com"}
// 创建一个通道来控制并发度为2
concurrency := make(chan struct{}, 2)
// 创建一个通道来接收响应结果
results := make(chan string, len(urls))
for _, url := range urls {
// 启动一个goroutine来请求url
go func(url string) {
// 从通道中获取一个信号,表示可以开始请求
concurrency <- struct{}{}
// 发送http GET请求
resp, err := http.Get(url)
if err != nil {
results <- fmt.Sprintf("%s -> error: %s", url, err)
} else {
results <- fmt.Sprintf("%s -> status: %s", url, resp.Status)
resp.Body.Close()
}
// 释放一个信号,表示请求已完成
<-concurrency
}(url)
}
// 从结果通道中读取所有响应结果
for i := 0; i < len(urls); i++ {
fmt.Println(<-results)
}
}
使用context,进行并发控制
这种机制下生成的goruntine是树形结构的,有依赖关系。
func getData(ctx context.Context, result chan string, id string) {
for {
select {
case <-ctx.Done():
fmt.Println("running get Data")
return
default:
resultData := PostData(id)
result <- resultData
}
}
}
func main() {
idList := []string{"xx", "xxx"}
ctx := context.Background()
var result = make(chan string, 2)
go getData(ctx, result, idList[0])
go getData(ctx, result, idList[1])
fmt.Println(<-result)
fmt.Println(<-result)
}
func PostData(id string) string {
url := "http://xxx"
method := "POST"
payload := strings.NewReader(fmt.Sprintf(`{"id":"%s"}`, id))
client := &http.Client{}
req, err := http.NewRequest(method, url, payload)
if err != nil {
fmt.Println(err)
return ""
}
req.Header.Add("Content-Type", "application/json")
res, err := client.Do(req)
if err != nil {
fmt.Println(err)
return ""
}
defer res.Body.Close()
body, err := ioutil.ReadAll(res.Body)
if err != nil {
fmt.Println(err)
return ""
}
return string(body)
}