需要使用协程池?
是否可以无限开多goroutine ?

goroutine 创建需要占用一定量的内存,开一个goroutine 只需要少量的内存空间,几KB,这也是golang能实现百万长链的原因.

但在实际中,goroutine 需要正确的关闭,而不是无限创建后,造成goroutine 泄露,进而引发系统崩溃

简单协程池实现

worker.go

package workpool

import (
    "fmt"
    "log"
)

type Job func()

type worker struct {
    workers   chan *worker
    JobArrive chan Job // 用于dispatch传递Job给每个worker
    stop      chan struct{}
}

func newWorker(works chan *worker) *worker {
    return &worker{
        workers:   works,
        JobArrive: make(chan Job),
        stop:      make(chan struct{}),
    }
}

func (w *worker) start() {
    go func() {
        var job Job
        for {
            w.workers <- w

            select {
            case job = <-w.JobArrive:
                runJob(job)
            case <-w.stop:
                fmt.Println("worker exit")
                return
            }
        }
    }()
}

func runJob(f func()) {
    defer func() {
        if err := recover(); err != nil {
            log.Printf("gpool run job panic err: %v", err)
        }
    }()

    f()
}

dispatch.go

package workpool

type dispatcher struct {
    workers  chan *worker
    jobQueue chan Job
    stop     chan struct{}
}

func newDispatcher(workers chan *worker, jobQueue chan Job) *dispatcher {
    d := &dispatcher{
        workers:  workers,
        jobQueue: jobQueue,
        stop:     make(chan struct{}),
    }

    for i := 0; i < cap(d.workers); i++ {
        w := newWorker(d.workers)
        w.start()
    }

    go d.dispatch()
    return d
}

func (d *dispatcher) dispatch() {
    for {
        select {
        case job := <-d.jobQueue:
            w := <-d.workers
            w.JobArrive <- job
        case <-d.stop:
            for i := 0; i < cap(d.workers); i++ {
                w := <-d.workers
                w.stop <- struct{}{}
            }

            return
        }
    }
}

workerPool.go

package workpool

import (
    "sync"
    "time"
)

type WorkerPool struct {
    jobQueue   chan Job
    dispatcher *dispatcher
    wg         sync.WaitGroup
}

func NewWorkerPool(numWorkers int, jobQueueLen int) *WorkerPool {
    workers := make(chan *worker, numWorkers)
    jobQueue := make(chan Job, jobQueueLen)

    pool := &WorkerPool{
        jobQueue:   jobQueue,
        dispatcher: newDispatcher(workers, jobQueue),
    }

    return pool
}

func (p *WorkerPool) RemainJob() int {
    return len(p.jobQueue)
}

func (p *WorkerPool) SendJob(job func()) {
    p.wg.Add(1)
    p.jobQueue <- p.wrapJob(job)
}

func (p *WorkerPool) SendJobWithTimeout(job func(), t time.Duration) bool {
    select {
    case <-time.After(t):
        return false
    case p.jobQueue <- p.wrapJob(job):
        p.wg.Add(1)
        return true
    }
}

func (p *WorkerPool) SendJobWithDeadline(job func(), t time.Time) bool {
    s := t.Sub(time.Now())
    if s <= 0 {
        s = time.Second
    }

    select {
    case <-time.After(s):
        return false
    case p.jobQueue <- p.wrapJob(job):
        p.wg.Add(1)
        return true
    }
}

func (p *WorkerPool) wrapJob(job func()) func() {
    return func() {
        defer p.wg.Done()
        job()
    }
}

func (p *WorkerPool) Shutdown() {
    p.wg.Wait()
    p.dispatcher.stop <- struct{}{}
}

测试:

import (
    "fmt"
    "time"
)

func main() {
    var pool = workpool.NewWorkerPool(2, 2)

    pool.SendJob(func() {
        fmt.Println("job1 exec start")
        time.Sleep(4 * time.Second)
        fmt.Println("job1 exec end")
    })

    pool.SendJob(func() {
        fmt.Println("job2 exec start")
        time.Sleep(4 * time.Second)
        fmt.Println("job2 exec end")
    })

    pool.SendJob(func() {
        fmt.Println("job3 exec start")
        time.Sleep(4 * time.Second)
        fmt.Println("job3 exec end")
    })

    pool.SendJob(func() {
        fmt.Println("job4 exec start")
        time.Sleep(4 * time.Second)
        fmt.Println("job4 exec end")
    })

    pool.SendJob(func() {
        fmt.Println("job5 exec start")
        time.Sleep(4 * time.Second)
        fmt.Println("job5 exec end")
    })

    fmt.Println(fmt.Sprintf("job remain:%d", pool.RemainJob()))

    sendOK := pool.SendJobWithTimeout(func() {
        fmt.Println("SendJobWithTimeout")
    }, 2*time.Second)
    if !sendOK {
        fmt.Println("worker pool is too busy")
    }

    pool.Shutdown()

    select {}
}
使用协程池

第三方库实现了协程池,比如: