使用channel实现协程池

通过 Channel 实现 Goroutine Pool,缺点是会造成协程的频繁开辟和注销,但好在简单灵活通用。

package mainimport (    "fmt"    "io/ioutil"    "net/http"    "sync")// Pool goroutine Pooltype Pool struct {    queue chan int    wg    *sync.WaitGroup}// New 新建一个协程池func New(size int) *Pool {    if size <= 0 {        size = 1    }    return &Pool{        queue: make(chan int, size),        wg:    &sync.WaitGroup{},    }}// Add 新增一个执行func (p *Pool) Add(delta int) {    // delta为正数就添加    for i := 0; i < delta; i++ {        p.queue <- 1    }    // delta为负数就减少    for i := 0; i > delta; i-- {        <-p.queue    }    p.wg.Add(delta)}// Done 执行完成减一func (p *Pool) Done() {    <-p.queue    p.wg.Done()}func (p *Pool) Wait() {    p.wg.Wait()}func main() {    // 这里限制100个并发    pool := New(100) // sync.WaitGroup{}    //假设需要发送1000万个http请求,然后我并发100个协程取完成这件事    for i := 0; i < 10000000; i++ {        pool.Add(1) //发现已存在100个人正在发了,那么就会卡住,直到有人完成了宣布自己退出协程了        go func(i int) {            resp, err := http.Get("https://www.baidu.com")            if err != nil {                fmt.Println(i, err)            } else {                defer resp.Body.Close()                result, _ := ioutil.ReadAll(resp.Body)                fmt.Println(i, string(result))            }            pool.Done()        }(i)    }    pool.Wait()}

消费者模式实现协程池

频繁对协程开辟与剔除,如果对性能有着很高的要求,建议优化成固定数目的协程取 channel 里面取数据进行消费,这样可以避免协程的创建与注销。

package mainimport (    "fmt"    "strconv"    "sync")// 任务对象type task struct {    Production    Consumer}// 设置消费者数目,也就是work pool大小func (t *task) setConsumerPoolSize(poolSize int) {    t.Production.Jobs = make(chan *Job, poolSize*10)    t.Consumer.WorkPoolNum = poolSize}// 任务数据对象type Job struct {    Data string}func NewTask(handler func(jobs chan *Job) (b bool)) (t *task) {    t = &task{        Production: Production{Jobs: make(chan *Job, 100)},        Consumer:   Consumer{WorkPoolNum: 10, Handler: handler},    }    return}type Production struct {    Jobs chan *Job}func (c Production) AddData(data *Job) {    c.Jobs <- data}type Consumer struct {    WorkPoolNum int    Handler     func(chan *Job) (b bool)    Wg          sync.WaitGroup}// 异步开启多个work去处理任务,但是所有work执行完毕才会退出程序func (c *Consumer) disposeData(data chan *Job) {    for i := 0; i <= c.WorkPoolNum; i++ {        c.Wg.Add(1)        go func() {            defer func() {                c.Wg.Done()            }()            c.Handler(data)        }()    }    c.Wg.Wait()}func main() {    // 实现一个用于处理数据的闭包,实现业务代码    consumerHandler := func(jobs chan *Job) (b bool) {        for jobs := range jobs {            fmt.Println(jobs)        }        return    }    // new一个任务处理对象    t := NewTask(consumerHandler)    t.setConsumerPoolSize(500) // 500个协程同时消费    // 根据自己的业务去生成数据通过AddData方法添加数据到生成channel,这里是100万条数据    go func() {        for i := 0; i < 1000000; i++ {            job := new(Job)            iStr := strconv.Itoa(i)            job.Data = "定义任务数据格式" + iStr            t.AddData(job)        }    }()    // 消费者消费数据    t.Consumer.disposeData(t.Production.Jobs)}

原文链接:https://www.cnblogs.com/niuben/p/15006362.html