golang实现并发执行多个goroutine,并拿到每个goroutine执行结果,如果其中一个goroutine报错,则结束未执行的goroutine, 还可以设置超时,话不多说上代码
package main
import (
"context"
"errors"
"fmt"
"log"
"sync"
"sync/atomic"
"time"
)
type Concurrent interface {
AddFunc(key string, f func(fRs chan interface{}, mErr chan error))
Run() (sync.Map, error)
}
type ConcurrentImp struct {
Err chan error // 接收错误信息
Rs sync.Map // 接收执行结果
taskNum uint32 // 执行任务数
finishedNum uint32 // 已经完成的goroutine
cancel context.CancelFunc
Ctx context.Context // 顶级context
//task map[string]func(k string) // 任务列表
task sync.Map
Finished chan int // 是否执行完成
Timer time.Duration // 设置超时
}
func NewConcurrentImp() *ConcurrentImp {
ctx, cancel := context.WithCancel(context.Background())
return &ConcurrentImp{
Ctx: ctx,
Err: make(chan error),
taskNum: 0,
finishedNum: 0,
Finished: make(chan int),
Timer: 30 * time.Second,
cancel: cancel,
//task: map[string]func(k string){},
//Rs: map[string]interface{}{},
}
}
func (c *ConcurrentImp) Run() (sync.Map, error) {
//执行任务, 非线程安全
//for k, task := range c.task {
// go task(k)
//}
c.task.Range(func(key, value any) bool {
k := key.(string)
f := value.(func(key string))
go f(k)
return true
})
// 执行监听
go c.listenTask(c.Ctx)
// 计时器
timer := time.After(c.Timer)
for {
select {
case err, ok := <-c.Err: // 子协程有错误
if ok {
c.cancel()
return c.Rs, err
}
case <-c.Finished: // 所有子程序都正常执行完成
return c.Rs, nil
case <-timer:
c.cancel()
return c.Rs, errors.New("超时")
default:
}
}
}
// 任务执行完成一个,数字减1
func (c *ConcurrentImp) listenTask(ctx context.Context) {
for {
select {
case <-ctx.Done():
fmt.Println("主协程退出,goroutine listen 退出")
return
default:
if c.finishedNum == c.taskNum && c.finishedNum != 0 {
c.Finished <- 1
return
}
}
}
}
func (c *ConcurrentImp) AddFunc(key string, f func(fRs chan interface{}, mErr chan error)) {
//c.task[key] = c.taskFunc(c.Ctx, f)
c.task.Store(key, c.taskFunc(c.Ctx, f))
atomic.AddUint32(&c.taskNum, 1)
}
// 讲方法放入任务列表
func (c *ConcurrentImp) taskFunc(ctx context.Context, f func(fRs chan interface{}, mErr chan error)) func(key string) {
return func(key string) {
rs := make(chan interface{})
go f(rs, c.Err)
for {
select {
case <-ctx.Done():
fmt.Println("主协程退出,goroutine " + key + "退出")
return
case r := <-rs:
//c.Rs[key] = r
c.Rs.Store(key, r)
atomic.AddUint32(&c.finishedNum, 1)
return
default:
}
}
}
}
func main() {
t := time.Now()
var ct Concurrent
ct = NewConcurrentImp()
ct.AddFunc("task1", func(fRs chan interface{}, mErr chan error) {
fRs <- 3
})
ct.AddFunc("task2", func(fRs chan interface{}, mErr chan error) {
//time.Sleep(35 * time.Second)
fRs <- 1
})
ct.AddFunc("task3", func(fRs chan interface{}, mErr chan error) {
fRs <- 2
})
ct.AddFunc("task4", func(fRs chan interface{}, mErr chan error) {
fRs <- 5
//mErr <- errors.New("test err")
})
if rs, err := ct.Run(); err != nil {
log.Fatalln(err)
} else {
fmt.Println(rs)
}
fmt.Println("program cost time:", time.Now().Sub(t))
}
benchmark
func BenchmarkNewConcurrentImpWithoutPool(b *testing.B) {
var ct Concurrent
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
for j := 0; j < 10000; j++ {
ct = NewConcurrentImp()
ct.AddFunc("task1", func(fRs chan interface{}, mErr chan error) {
fRs <- 3
})
ct.AddFunc("task2", func(fRs chan interface{}, mErr chan error) {
//time.Sleep(35 * time.Second)
fRs <- 1
})
ct.AddFunc("task3", func(fRs chan interface{}, mErr chan error) {
fRs <- 2
})
ct.AddFunc("task4", func(fRs chan interface{}, mErr chan error) {
fRs <- 5
//mErr <- errors.New("test err")
})
ct.Run()
}
}
}
sync.pool
// Sync.pool实现性能提升
var concurrentPool = sync.Pool{
New: func() interface{} { return NewConcurrentImp() },
}
func BenchmarkNewConcurrentImpWithPool(b *testing.B) {
var ct Concurrent
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
for j := 0; j < 10000; j++ {
ct = concurrentPool.Get().(Concurrent)
ct.AddFunc("task1", func(fRs chan interface{}, mErr chan error) {
fRs <- 3
})
ct.AddFunc("task2", func(fRs chan interface{}, mErr chan error) {
//time.Sleep(35 * time.Second)
fRs <- 1
})
ct.AddFunc("task3", func(fRs chan interface{}, mErr chan error) {
fRs <- 2
})
ct.AddFunc("task4", func(fRs chan interface{}, mErr chan error) {
fRs <- 5
//mErr <- errors.New("test err")
})
ct.Run()
}
}
}
执行go test -bench=.
结果相差不大
goos: linux
goarch: amd64
pkg: demo
cpu: 12th Gen Intel(R) Core(TM) i5-12400
BenchmarkNewConcurrentImpWithoutPool-12 1 2545835033 ns/op 26417840 B/op 651565 allocs/op
BenchmarkNewConcurrentImpWithPool-12 1 2526898684 ns/op 26224176 B/op 650405 allocs/op
PASS
ok demo 5.076s