这段时间在做一个zfb在线支付项目, 用到了任务队列, 索性就用生产者消费者模型简单的撸了一把; 不足之处, 请多多指教.
1, 文件结构如下:
2, taskqueue.go 代码如下:
package taskqueue
import (
"container/list"
"sync"
"sync/atomic"
)
// 等待协程的数量
var waitRoutine int32 = 0
// Itemer ..
type Itemer interface {
Process()
}
// TaskQueue ..
type TaskQueue struct {
tl *list.List
size int // 队列大小
routine int // 开启协程的数量
sync.Mutex
*sync.Cond
}
// NewTaskQueue ..
func NewTaskQueue(size int, routine int) *TaskQueue {
if size <= 0 {
size = 4
}
if routine <= 0 {
routine = 4
}
taskQueue := &TaskQueue{
size: size,
routine: routine,
}
taskQueue.Cond = sync.NewCond(&taskQueue.Mutex)
taskQueue.tl = &list.List{}
return taskQueue
}
// Run ..
func (taskQueue *TaskQueue) Run() {
for index := 0; index < taskQueue.routine; index++ {
go process(taskQueue)
}
}
// PushItem ..
func (taskQueue *TaskQueue) PushItem(item Itemer) bool {
taskQueue.Mutex.Lock()
if taskQueue.tl.Len() >= taskQueue.size {
taskQueue.Mutex.Unlock()
return false
} else {
taskQueue.tl.PushBack(item)
}
taskQueue.Mutex.Unlock()
if atomic.LoadInt32(&waitRoutine) > 0 {
taskQueue.Cond.Signal()
}
return true
}
func process(taskQueue *TaskQueue) {
for {
taskQueue.Mutex.Lock()
for taskQueue.tl.Len() == 0 {
atomic.AddInt32(&waitRoutine, 1)
taskQueue.Cond.Wait()
atomic.AddInt32(&waitRoutine, -1)
}
item := taskQueue.tl.Front()
taskQueue.tl.Remove(item)
taskQueue.Mutex.Unlock()
item.Value.(Itemer).Process()
}
}
3, main.go为测试代码, 如下:
package main
import (
"fmt"
"task/taskqueue"
)
// FirstStruct ..
type FirstStruct struct {
Msg string
}
// Process ..
func (firstStruct *FirstStruct) Process() {
fmt.Println("this is FirstStruct:", firstStruct.Msg)
}
// SecondStruct ..
type SecondStruct struct {
Msg string
}
// Process ..
func (secondStruct *SecondStruct) Process() {
fmt.Println("this is SecondStruct:", secondStruct.Msg)
}
func main() {
taskQ := taskqueue.NewTaskQueue(1024, 128)
taskQ.Run()
go func() {
first := &FirstStruct{}
first.Msg = "hello"
for index := 0; index < 1000; index++ {
if !taskQ.PushItem(first) {
fmt.Println("push failed, index:", index)
continue
}
}
}()
go func() {
second := &SecondStruct{}
second.Msg = "world"
for index := 0; index < 1000; index++ {
if !taskQ.PushItem(second) {
fmt.Println("push failed, index:", index)
continue
}
}
}()
for {
}
}
打印结果如下:
this is FirstStruct: hello
this is SecondStruct: world
this is FirstStruct: hello
this is SecondStruct: world
this is FirstStruct: hello
this is FirstStruct: hello
this is SecondStruct: world
this is FirstStruct: hello
this is FirstStruct: hello
this is FirstStruct: hello
this is SecondStruct: world