需要使用协程池?
是否可以无限开多goroutine ?
goroutine 创建需要占用一定量的内存,开一个goroutine 只需要少量的内存空间,几KB,这也是golang能实现百万长链的原因.
但在实际中,goroutine 需要正确的关闭,而不是无限创建后,造成goroutine 泄露,进而引发系统崩溃
简单协程池实现
worker.go
package workpool
import (
"fmt"
"log"
)
type Job func()
type worker struct {
workers chan *worker
JobArrive chan Job // 用于dispatch传递Job给每个worker
stop chan struct{}
}
func newWorker(works chan *worker) *worker {
return &worker{
workers: works,
JobArrive: make(chan Job),
stop: make(chan struct{}),
}
}
func (w *worker) start() {
go func() {
var job Job
for {
w.workers <- w
select {
case job = <-w.JobArrive:
runJob(job)
case <-w.stop:
fmt.Println("worker exit")
return
}
}
}()
}
func runJob(f func()) {
defer func() {
if err := recover(); err != nil {
log.Printf("gpool run job panic err: %v", err)
}
}()
f()
}
dispatch.go
package workpool
type dispatcher struct {
workers chan *worker
jobQueue chan Job
stop chan struct{}
}
func newDispatcher(workers chan *worker, jobQueue chan Job) *dispatcher {
d := &dispatcher{
workers: workers,
jobQueue: jobQueue,
stop: make(chan struct{}),
}
for i := 0; i < cap(d.workers); i++ {
w := newWorker(d.workers)
w.start()
}
go d.dispatch()
return d
}
func (d *dispatcher) dispatch() {
for {
select {
case job := <-d.jobQueue:
w := <-d.workers
w.JobArrive <- job
case <-d.stop:
for i := 0; i < cap(d.workers); i++ {
w := <-d.workers
w.stop <- struct{}{}
}
return
}
}
}
workerPool.go
package workpool
import (
"sync"
"time"
)
type WorkerPool struct {
jobQueue chan Job
dispatcher *dispatcher
wg sync.WaitGroup
}
func NewWorkerPool(numWorkers int, jobQueueLen int) *WorkerPool {
workers := make(chan *worker, numWorkers)
jobQueue := make(chan Job, jobQueueLen)
pool := &WorkerPool{
jobQueue: jobQueue,
dispatcher: newDispatcher(workers, jobQueue),
}
return pool
}
func (p *WorkerPool) RemainJob() int {
return len(p.jobQueue)
}
func (p *WorkerPool) SendJob(job func()) {
p.wg.Add(1)
p.jobQueue <- p.wrapJob(job)
}
func (p *WorkerPool) SendJobWithTimeout(job func(), t time.Duration) bool {
select {
case <-time.After(t):
return false
case p.jobQueue <- p.wrapJob(job):
p.wg.Add(1)
return true
}
}
func (p *WorkerPool) SendJobWithDeadline(job func(), t time.Time) bool {
s := t.Sub(time.Now())
if s <= 0 {
s = time.Second
}
select {
case <-time.After(s):
return false
case p.jobQueue <- p.wrapJob(job):
p.wg.Add(1)
return true
}
}
func (p *WorkerPool) wrapJob(job func()) func() {
return func() {
defer p.wg.Done()
job()
}
}
func (p *WorkerPool) Shutdown() {
p.wg.Wait()
p.dispatcher.stop <- struct{}{}
}
测试:
import (
"fmt"
"time"
)
func main() {
var pool = workpool.NewWorkerPool(2, 2)
pool.SendJob(func() {
fmt.Println("job1 exec start")
time.Sleep(4 * time.Second)
fmt.Println("job1 exec end")
})
pool.SendJob(func() {
fmt.Println("job2 exec start")
time.Sleep(4 * time.Second)
fmt.Println("job2 exec end")
})
pool.SendJob(func() {
fmt.Println("job3 exec start")
time.Sleep(4 * time.Second)
fmt.Println("job3 exec end")
})
pool.SendJob(func() {
fmt.Println("job4 exec start")
time.Sleep(4 * time.Second)
fmt.Println("job4 exec end")
})
pool.SendJob(func() {
fmt.Println("job5 exec start")
time.Sleep(4 * time.Second)
fmt.Println("job5 exec end")
})
fmt.Println(fmt.Sprintf("job remain:%d", pool.RemainJob()))
sendOK := pool.SendJobWithTimeout(func() {
fmt.Println("SendJobWithTimeout")
}, 2*time.Second)
if !sendOK {
fmt.Println("worker pool is too busy")
}
pool.Shutdown()
select {}
}
使用协程池
第三方库实现了协程池,比如: