有这么个需求
1、调用多个服务,pack1、pack2、pack3…,需要并行调用;
2、任意一个服务返回失败,提前返回失败
3、需要有超时控制不能等待太久
4、收集每个服务返回的结果,后面要做处理
怎么实现呢?
codepackage main
import (
"code.byted.org/gopkg/logs"
"fmt"
"github.com/pkg/errors"
"time"
)
func pack1(secs time.Duration) (result string, err error) {
fmt.Printf("waiting %v\n", secs)
time.Sleep(secs * time.Second)
result = fmt.Sprintf("waited for %v seconds", secs)
//return result, errors.New("pack1 error")
return result, nil
}
func pack2(secs time.Duration) (result string, err error) {
fmt.Printf("waiting %v\n", secs)
time.Sleep(secs * time.Second)
result = fmt.Sprintf("waited for %v seconds", secs)
return result, errors.New("pack2 error")
//return result, nil
}
// golang parallel return functions
// 并行执行多个pack,只要有一个错误,就可以提前返回,否则需要收集多个pack结果拼接一起(reduce)
func runParallel() (string, error) {
defer logs.Flush()
outChan1 := make(chan *string)
outChan2 := make(chan *string)
go func() {
res1, err := pack1(1)
if err != nil {
logs.Error("pack1 error")
outChan1 <- nil
} else {
outChan1 <- &res1
}
}()
go func() {
res2, err := pack2(5)
if err != nil {
logs.Error("pack2 error")
outChan2 <- nil
} else {
outChan2 <- &res2
}
}()
var out1, out2 *string
finishCnt := 0
for finishCnt < 2 {
select {
case out1 = <-outChan1:
if out1 == nil {
return "", errors.New("pack1 return nil, error")
}
finishCnt++
case out2 = <-outChan2:
if out2 == nil {
return "", errors.New("pack2 return nil, error")
}
finishCnt++
case <-time.After(2 * time.Second):
return "", errors.New("wait 2s res timeout")
}
}
// 到这里就可以确保有俩结果了?
if out1 == nil || out2 == nil {
return "impossible", nil
}
return fmt.Sprintf("out1: %s, out2: %s", *out1, *out2), nil
}
func main() {
//t0 := time.Now()
//runNotParallel()
t1 := time.Now()
//fmt.Println("no parallel: ", t1.Sub(t0))
res, err := runParallel()
fmt.Println("res:", res, err)
fmt.Println("parallel: ", time.Now().Sub(t1))
time.Sleep(10 * time.Second)
}
看上去好像是ok的。测了一些case也没问题。求大神们指出是否有bug
还有一种是用syncMap实现,应该也可以
package main
import (
"code.byted.org/gopkg/logs"
"fmt"
"sync"
"time"
)
// golang parallel return functions
// 并行执行多个pack,只要有一个错误,就可以提前返回,否则需要收集多个pack结果拼接一起(reduce)
func runParallelSyncMap() (string, error) {
defer logs.Flush()
var (
wgBegin sync.WaitGroup
synMap sync.Map
)
errorChan := make(chan struct{})
wgBegin.Add(2)
go func() {
defer wgBegin.Done()
res1, err := pack1(1)
if err != nil {
logs.Error("pack1 error")
errorChan <- struct{}{}
//close(errorChan)
} else {
synMap.Store("pack1", res1)
}
}()
go func() {
defer wgBegin.Done()
res2, err := pack2(2)
if err != nil {
logs.Error("pack2 error")
errorChan <- struct{}{}
//close(errorChan)// <- struct{}{}
} else {
synMap.Store("pack2", res2)
}
}()
SyncWaitTimeout(&wgBegin, 3*time.Second, errorChan)
// 到这里就可以确保有俩结果了?
res := ""
synMap.Range(func(k, v interface{}) bool {
res += v.(string) + " +++ "
return true
})
return fmt.Sprintf("out: %s", res), nil
}
func main() {
//t0 := time.Now()
//runNotParallel()
t1 := time.Now()
//fmt.Println("no parallel: ", t1.Sub(t0))
res, err := runParallelSyncMap()
fmt.Println("res:", res, err)
fmt.Println("parallel: ", time.Now().Sub(t1))
time.Sleep(10 * time.Second)
}
func SyncWaitTimeout(wg *sync.WaitGroup, timeout time.Duration, errorChan chan struct{}) bool {
defer logs.Flush()
finish := make(chan struct{})
go func() {
defer close(finish)
wg.Wait()
}()
select {
case <-finish:
return true
case <-errorChan:
logs.Error("errorChan")
return false
case <-time.After(timeout):
logs.Error("timeout")
return false
}
}
参考