话不多说, 先上测试数据, 在各种负载下均有良好表现:
// small task
const (
PoolSize = 16
BenchTimes = 1000
N = 1000
)
goos: darwin
goarch: arm64
pkg: bench
BenchmarkGwsWorkerQueue
BenchmarkGwsWorkerQueue-8 3302 357841 ns/op 55977 B/op 2053 allocs/op
BenchmarkGopool
BenchmarkGopool-8 4426 319383 ns/op 20000 B/op 1173 allocs/op
BenchmarkAnts
BenchmarkAnts-8 3026 399899 ns/op 16047 B/op 1001 allocs/op
BenchmarkNbio
BenchmarkNbio-8 4314 259668 ns/op 48028 B/op 3000 allocs/op
PASS
// medium task
const (
PoolSize = 16
BenchTimes = 1000
N = 10000
)
goos: darwin
goarch: arm64
pkg: bench
BenchmarkGwsWorkerQueue
BenchmarkGwsWorkerQueue-8 1491 808853 ns/op 57635 B/op 2008 allocs/op
BenchmarkGopool
BenchmarkGopool-8 1377 870051 ns/op 17266 B/op 1029 allocs/op
BenchmarkAnts
BenchmarkAnts-8 886 1324236 ns/op 16054 B/op 1001 allocs/op
BenchmarkNbio
BenchmarkNbio-8 1324 836092 ns/op 48000 B/op 3000 allocs/op
PASS
// large task
const (
PoolSize = 16
BenchTimes = 1000
N = 100000
)
goos: darwin
goarch: arm64
pkg: bench
BenchmarkGwsWorkerQueue
BenchmarkGwsWorkerQueue-8 193 6026196 ns/op 58162 B/op 2004 allocs/op
BenchmarkGopool
BenchmarkGopool-8 178 6942255 ns/op 17108 B/op 1019 allocs/op
BenchmarkAnts
BenchmarkAnts-8 174 6300705 ns/op 16157 B/op 1002 allocs/op
BenchmarkNbio
BenchmarkNbio-8 176 7084957 ns/op 48071 B/op 2995 allocs/op
PASS
测试代码 Benchmark
代码实现
package bench
import (
"sync"
)
type (
WorkerQueue struct {
mu *sync.Mutex // 锁
q []Job // 任务队列
maxConcurrency int32 // 最大并发
curConcurrency int32 // 当前并发
}
Job func()
)
// NewWorkerQueue 创建一个任务队列
func NewWorkerQueue(maxConcurrency int32) *WorkerQueue {
return &WorkerQueue{
mu: &sync.Mutex{},
maxConcurrency: maxConcurrency,
curConcurrency: 0,
}
}
// 获取一个任务
func (c *WorkerQueue) getJob(delta int32) Job {
c.mu.Lock()
defer c.mu.Unlock()
c.curConcurrency += delta
if c.curConcurrency >= c.maxConcurrency {
return nil
}
if n := len(c.q); n == 0 {
return nil
}
var result = c.q[0]
c.q = c.q[1:]
c.curConcurrency++
return result
}
// 递归地执行任务
func (c *WorkerQueue) do(job Job) {
job()
if nextJob := c.getJob(-1); nextJob != nil {
go c.do(nextJob)
}
}
// Push 追加任务, 有资源空闲的话会立即执行
func (c *WorkerQueue) Push(job Job) {
c.mu.Lock()
c.q = append(c.q, job)
c.mu.Unlock()
if item := c.getJob(0); item != nil {
go c.do(item)
}
}
如果觉得对你有帮助, 麻烦给 gws 点个赞吧:)