先上代码,有兴趣的可以跑一下
可以实现在有限的线程里执行多个任务,控制内存使用,防止内存飙高
package main import ( "context" "fmt" "strconv" "sync" "time" ) // 定义静态变量 用于外部访问内部方法 var pool *_Pool // 定义空结构体,相当于类,和下面组合New方法起来就是pool类的静态方法 type _Pool struct { } // 定义接口 可传任意参数 type TaskFunc func(args ...interface{}) // 定义任务实体,里面有方法和参数 type Task struct { f TaskFunc args interface{} } // 定义线程池对象 type WorkPool struct { Pool chan *Task //定义任务池 WorkCount int //工作线程数量,决定初始化几个goroutine StopCtx context.Context //上下文 StopCancel context.CancelFunc WG sync.WaitGroup //阻塞计数器 } //任务执行 func (t *Task) Execute(args ...interface{}) { t.f(args...) } // 实例化一个新线程池 func (*_Pool) New(workerCount int, len int) *WorkPool { return &WorkPool{ WorkCount: workerCount, Pool: make(chan *Task, len), } } // 任务入队 func (w *WorkPool) PushTask(task *Task) { w.Pool <- task } // 任务调度 go协程从channel里取任务执行Execute方法 func (w *WorkPool) Work(wid int) { for { select { case <-w.StopCtx.Done(): w.WG.Done() fmt.Printf("线程%d 退出执行了 ", wid) return case t := <-w.Pool: if t != nil { t.Execute() fmt.Printf("f被线程%d执行了,参数为%v ", wid, t.args) } } } } //启动线程池,触发任务调度 func (w *WorkPool) Start() *WorkPool { //定义好worker数量 w.WG.Add(w.WorkCount) w.StopCtx, w.StopCancel = context.WithCancel(context.Background()) for i := 0; i < w.WorkCount; i++ { //定义多少个协程来工作 go w.Work(i) } return w } // 停止执行任务,回收正在执行任务的协程 协程计数器减1 直到变成0退出,否则阻塞 func (w *WorkPool) Stop() { w.StopCancel() w.WG.Wait() } func main() { // 任务计数器 taskWg := sync.WaitGroup{} workerCount := 2 taskCount := 10 // 启动线程池 len=channel通道容量,超过容量生产者阻塞,容量变成0 消费者阻塞 pool := pool.New(workerCount, 5).Start() taskWg.Add(taskCount) //构建任务 放入线程池 for i := 0; i < taskCount; i++ { task := &Task{ args: "zhangSan" + strconv.FormatInt(int64(i), 10), f: func(args ...interface{}) { time.Sleep(time.Second) taskWg.Done() // 任务完成计数器减一 }, } pool.PushTask(task) } fmt.Println("任务入队完成") //等待任务执行完成 taskWg.Wait() // 回收资源 close(pool.Pool) fmt.Println("任务全部执行完成") }
运行效果
f被线程0执行了,参数为zhangSan0 f被线程1执行了,参数为zhangSan1 f被线程1执行了,参数为zhangSan3 f被线程0执行了,参数为zhangSan2 任务入队完成 f被线程0执行了,参数为zhangSan5 f被线程1执行了,参数为zhangSan4 f被线程1执行了,参数为zhangSan7 f被线程0执行了,参数为zhangSan6 f被线程1执行了,参数为zhangSan8 任务全部执行完成
里面代码注释的很详细了,就不赘述了