一、前言

goroutinegoroutinegogo

1、goroutine的错误处理

sync.ErrGroupsync.WaitGroupgoroutine

具体的大家可以百度学习下。errGroup

2、goroutine的处理结果

goroutinechannelsync.WaitGroupcontextchannelchannel
channelsync.Mapgoroutinesync.Map

二、控制goroutine数量以及获取处理结果

1、实战代码

goroutine
//定义要获取的返回值,成功数量,失败数量,失败id集合
succeededNums, failedNums:= 0, 0, 
errData:=""
var syncMap sync.Map
wg := sync.WaitGroup{}
//控制goroutine数量,保证同时只有10个goroutine
chan1 := make(chan struct{}, 10) 
for {
   //业务逻辑
   // ....
   //goroutine加速,chan1写满,则阻塞。等待之前的goroutine释放才能继续循环
   chan1 <- struct{}{}
   wg.Add(1)
   go func(ctx context.Context, targetIndexName string, esClient *elastic.Client) {
      defer func() {
         if err1 := recover(); err1 != nil { //产生了panic异常
            log.Errorf(ctx, "%s go panic! err:(+%v)", logPreFix, err1)
         }
         wg.Done() //每个goroutine执行完毕则释放
         return
      }()
      bulkRequest := esClient.Bulk()
      //ES使用bulk方法批量刷新数据
      bulkResByAssetInfo := new(EsBulkDataRes)
      bulkResByAssetInfo, err = BulkEsDataByAssetInfo(ctx, bulkRequest, targetIndexName)
      if err != nil {
         log.Errorf(ctx, "%s BulkEsDataByAssetInfo error (%+v) ,test:(+%v)", logPreFix, err, string_util.TransferToString(fileUnitList))
         return
      }
      //累加执行结果到sync.map,保证并发安全
      tempMap := make(map[string]interface{})
      tempMap["successNums"] = bulkResByAssetInfo.SucceededNums
      tempMap["failedNums"] = bulkResByAssetInfo.FailedNums
      tempMap["errData"] = bulkResByAssetInfo.ErrData
      //每次取循环的的最大id,作为syncMap的key
      syncMap.Store(test[nums-1].ID, tempMap)
      //执行完毕再释放channel
      <-chan1
   }(ctx, targetIndexName, esClient)
  
}
wg.Wait()
//刷新结束,写入通知,通知内容包括,遍历sync.map,获取返回值
syncMap.Range(func(key, value interface{}) bool {
   val := value.(map[string]interface{})
   succeededNums += val["successNums"].(int)
   failedNums += val["failedNums"].(int)
   errData += val["errData"].(string)
   return true
})

//打印结果
fmt.Println("成功数量:",succeededNums)
fmt.Println("失败数量:",failedNums)
fmt.Println("失败id:",errData)

2、syncMap的使用

(1)写入处理结果到map
(2) 写入map到sync.Map中,注意key不要重复
(3)使用Range来循环sync.Map,获取处理结果,并累加

3、控制goroutine的数量

channel
(1)设定channel长度,循环开始每生成一个goroutine则写入一次channel
(2) channel写满则阻塞
(3)goroutine执行完毕,释放channel
(4) for循环中继续写入channel,保证同时执行的goroutine只有10个

三、sync.Map的缺点

valueinterface{}
sync.Mapsync.MapcurrentMaphash

end