这段时间在做一个zfb在线支付项目, 用到了任务队列, 索性就用生产者消费者模型简单的撸了一把; 不足之处, 请多多指教.

1, 文件结构如下:

2, taskqueue.go 代码如下:

package taskqueue

import (
	"container/list"
	"sync"
	"sync/atomic"
)

// 等待协程的数量
var waitRoutine int32 = 0

// Itemer ..
type Itemer interface {
	Process()
}

// TaskQueue ..
type TaskQueue struct {
	tl *list.List

	size    int // 队列大小
	routine int // 开启协程的数量

	sync.Mutex
	*sync.Cond
}

// NewTaskQueue ..
func NewTaskQueue(size int, routine int) *TaskQueue {
	if size <= 0 {
		size = 4
	}
	if routine <= 0 {
		routine = 4
	}

	taskQueue := &TaskQueue{
		size:    size,
		routine: routine,
	}
	taskQueue.Cond = sync.NewCond(&taskQueue.Mutex)
	taskQueue.tl = &list.List{}
	return taskQueue
}

// Run ..
func (taskQueue *TaskQueue) Run() {
	for index := 0; index < taskQueue.routine; index++ {
		go process(taskQueue)
	}
}

// PushItem ..
func (taskQueue *TaskQueue) PushItem(item Itemer) bool {

	taskQueue.Mutex.Lock()
	if taskQueue.tl.Len() >= taskQueue.size {
		taskQueue.Mutex.Unlock()
		return false
	} else {
		taskQueue.tl.PushBack(item)
	}
	taskQueue.Mutex.Unlock()

	if atomic.LoadInt32(&waitRoutine) > 0 {
		taskQueue.Cond.Signal()
	}

	return true
}

func process(taskQueue *TaskQueue) {
	for {
		taskQueue.Mutex.Lock()
		for taskQueue.tl.Len() == 0 {

			atomic.AddInt32(&waitRoutine, 1)
			taskQueue.Cond.Wait()
			atomic.AddInt32(&waitRoutine, -1)
		}

		item := taskQueue.tl.Front()
		taskQueue.tl.Remove(item)
		taskQueue.Mutex.Unlock()

		item.Value.(Itemer).Process()
	}
}

3, main.go为测试代码, 如下:

package main

import (
	"fmt"

	"task/taskqueue"
)

// FirstStruct ..
type FirstStruct struct {
	Msg string
}

// Process ..
func (firstStruct *FirstStruct) Process() {
	fmt.Println("this is FirstStruct:", firstStruct.Msg)
}

// SecondStruct ..
type SecondStruct struct {
	Msg string
}

// Process ..
func (secondStruct *SecondStruct) Process() {
	fmt.Println("this is SecondStruct:", secondStruct.Msg)
}

func main() {
	taskQ := taskqueue.NewTaskQueue(1024, 128)
	taskQ.Run()

	go func() {
		first := &FirstStruct{}
		first.Msg = "hello"
		for index := 0; index < 1000; index++ {
			if !taskQ.PushItem(first) {
				fmt.Println("push failed, index:", index)
				continue
			}
		}
	}()

	go func() {
		second := &SecondStruct{}
		second.Msg = "world"
		for index := 0; index < 1000; index++ {
			if !taskQ.PushItem(second) {
				fmt.Println("push failed, index:", index)
				continue
			}
		}
	}()

	for {
	}
}

打印结果如下:

this is FirstStruct: hello
this is SecondStruct: world
this is FirstStruct: hello
this is SecondStruct: world
this is FirstStruct: hello
this is FirstStruct: hello
this is SecondStruct: world
this is FirstStruct: hello
this is FirstStruct: hello
this is FirstStruct: hello
this is SecondStruct: world