1、GoroutinePool代码实现
package pool
import (
"fmt"
"log"
"os"
"os/signal"
"strconv"
"syscall"
)
//参考模型:工厂流水线->流水线员工->待加工产品
type Payload struct {
Name string
}
func (p *Payload) Play() {
log.Printf("%s加工完成。\n", p.Name)
}
//任务
type Job struct {
Payload Payload
}
type Worker struct {
WorkerId string //员工ID
Workbench chan Job //员工加工产品的工作台,即来即走(无缓冲)。
GWorkbenchQueue chan chan Job //等待分配加工产品的员工工作台队列
Finished chan bool //员工结束工作通知通道,无缓冲
}
// 新建一条工厂流水线
func NewWorker(WorkbenchQueue chan chan Job, Id string) *Worker {
log.Printf("新建流水线:%s \n", Id)
return &Worker{
WorkerId: Id, //员工ID
Workbench: make(chan Job), //员工加工产品的工作台,即来即走(无缓冲)。
GWorkbenchQueue: WorkbenchQueue, //等待分配加工产品的员工工作台队列
Finished: make(chan bool), //无缓冲
}
}
// 工人开始工作
func (w *Worker) Start() {
//开一个新的协程
go func() {
for {
//将当前未分配待加工产品的工作台添加到工作台队列中
w.GWorkbenchQueue <- w.Workbench
log.Printf("把[%s]的工作台添加到工作台队列中,当前工作台队列长度:%d\n", w.WorkerId, len(w.GWorkbenchQueue))
select {
//接收到了新的WorkerJob
case wJob := <-w.Workbench:
wJob.Payload.Play()
case bFinished := <-w.Finished:
if true == bFinished {
log.Printf("%s 结束工作!\n", w.WorkerId)
return
}
}
}
}()
}
func (w *Worker) Stop() {
//w.QuitChannel <- true
go func() {
w.Finished <- true
}()
}
type Dispatcher struct {
DispatcherId string //流水线ID
MaxWorkers int //流水线上的员工(Worker)最大数量
Workers []*Worker //流水线上所有员工(Worker)对象集合
Closed chan bool //流水线工作状态通道
EndDispatch chan os.Signal //流水线停止工作信号
GJobQueue chan Job //流水线上的所有代加工产品(Job)队列通道
GWorkbenchQueue chan chan Job //流水线上的所有操作台队列通道
}
func NewDispatcher(maxWorkers, maxQueue int) *Dispatcher {
Closed := make(chan bool)
EndDispatch := make(chan os.Signal)
JobQueue := make(chan Job, maxQueue)
WorkbenchQueue := make(chan chan Job, maxWorkers)
signal.Notify(EndDispatch, syscall.SIGINT, syscall.SIGTERM)
return &Dispatcher{
DispatcherId: "调度者",
MaxWorkers: maxWorkers,
Closed: Closed,
EndDispatch: EndDispatch,
GJobQueue: JobQueue,
GWorkbenchQueue: WorkbenchQueue,
}
}
func (d *Dispatcher) Run() {
// 开始运行
for i := 0; i < d.MaxWorkers; i++ {
worker := NewWorker(d.GWorkbenchQueue, fmt.Sprintf("work-%s", strconv.Itoa(i)))
d.Workers = append(d.Workers, worker)
//开始工作
worker.Start()
}
//监控
go d.Dispatch()
}
func (d *Dispatcher) Dispatch() {
FLAG:
for {
select {
case endDispatch := <-d.EndDispatch:
log.Printf("流水线关闭命令[%v]已发出...\n", endDispatch)
close(d.GJobQueue)
case wJob, Ok := <-d.GJobQueue:
if true == Ok {
log.Println("从流水线获取一个待加工产品(Job)")
go func(wJob Job) {
//获取未分配待加工产品的工作台
Workbench := <-d.GWorkbenchQueue
//将待加工产品(Job)放入工作台进行加工
Workbench <- wJob
}(wJob)
} else {
for _, w := range d.Workers {
w.Stop()
}
d.Closed <- true
break FLAG
}
}
}
}
type WorkFlow struct {
GDispatch *Dispatcher
}
func (wf *WorkFlow) StartWorkFlow(maxWorkers, maxQueue int) {
//初始化一个调度器(流水线),并指定该流水线上的员工(Worker)和待加工产品(Job)的最大数量
wf.GDispatch = NewDispatcher(maxWorkers, maxQueue)
//启动流水线
wf.GDispatch.Run()
}
func (wf *WorkFlow) AddJob(wJob Job) {
//向流水线中放入待加工产品(Job)
wf.GDispatch.GJobQueue <- wJob
}
func (wf *WorkFlow) CloseWorkFlow() {
closed := <-wf.GDispatch.Closed
if true == closed {
log.Println("调度器(流水线)已关闭.")
}
}
2、示例
package main
import (
"fmt"
"pool"
"time"
)
func main() {
var wf pool.WorkFlow
//初始化并启动工作
wf.StartWorkFlow(2, 4)
for i := 0; i < 100; i++ {
payload := pool.Payload{
fmt.Sprintf("产品-%08d", i+1),
}
wJob := pool.Job{
Payload: payload,
}
//添加工作
wf.AddJob(wJob)
//time.Sleep(time.Millisecond * 10)
}
wf.CloseWorkFlow()
time.Sleep(time.Second * 20)
}
参考: