• 单向队列只允许一端进一端出
  • 双端队列两端都进和出
基于切片的单向队列简单实现
package main

import (
"fmt"
"sync"
)

type QueueInterface interface {
	Len() int
	Empty() bool
	Clear()
	Push(data interface{})
	Pop() interface{}
	Peep() interface{}
}

type Queue  struct {
	data []interface{}
	len int
	lock *sync.Mutex
}

func NewQueue() *Queue  {
	return &Queue{
		data: make([]interface{}, 0),
		len:   0,
		lock:  new(sync.Mutex),
	}
}

func (q *Queue) Clear()  {
	q.lock.Lock()
	defer q.lock.Unlock()

	if q.len == 0 {
		return
	}

	q.data = make([]interface{}, 0)
	q.len = 0;
}

func (q *Queue) Len() int  {
	q.lock.Lock()
	defer q.lock.Unlock()

	return q.len
}

func (q *Queue) Empty() bool  {
	q.lock.Lock()
	defer q.lock.Unlock()

	return q.len == 0
}

func (q *Queue)Push(data interface{})  {
	if data == nil {
		return
	}

	q.lock.Lock()
	defer q.lock.Unlock()

	q.data = append(q.data, data)
	q.len++;
}

func (q *Queue)Pop() interface{} {
	q.lock.Lock()
	defer q.lock.Unlock()

	if q.len == 0 {
		return nil
	}

	var ele interface{} = nil


	ele, q.data = q.data[0], q.data[1:]
	q.len--
	return  ele
}

func (q *Queue)Peep() interface{}  {
	q.lock.Lock()
	defer q.lock.Unlock()

	if q.len == 0 {
		return nil
	}


	return q.data[0]
}

func main() {
	q := NewQueue()

	if !q.Empty() || q.len != 0 || q.Len() != 0 {
		println("eerr");
		return
	}

	q.Push(1)
	q.Push(2)
	q.Push(3)

	fmt.Println(q.data)



	q.Pop()
	fmt.Println(q.data)

	q.Pop()
	fmt.Println(q.data)

	q.Pop()
	fmt.Println(q.data)

	q.Pop()
	fmt.Println(q.data)

	q.Pop()
	fmt.Println(q.data)
}

使用链表简单的实现

这个例子用Go语言的包"container/list"实现一个线程安全访问的队列

package main

import (
	"container/list"
	"fmt"
	"sync"
)



type Queue struct {
	data *list.List
	sync.Mutex
}

func NewQueue() *Queue  {
	q := new(Queue)
	q.data = list.New()
	return q
}

func (q *Queue) Push(v interface{})  {
	q.Lock()
	defer q.Unlock()
	q.data.PushFront(v)
}

func (q *Queue) Pop() interface{}  {
	q.Lock()
	defer q.Unlock()

	iter := q.data.Back()
	v := iter.Value
	q.data.Remove(iter)
	return v
}

func (q *Queue) Dump()  {
	for iter := q.data.Back();  iter != nil; iter = iter.Prev()  {
		fmt.Println("item:", iter.Value)
	}
}

func (q *Queue) Empty() bool {
	return q.data.Len() == 0
}


func (q *Queue) Size() int {
	return q.data.Len()
}


func main() {
	q := NewQueue()
	q.Push("foo")
	q.Push("bar")
	q.Push("baz")

	q.Dump()
}




基于切片的单向队列复杂实现
package main

import (
	"errors"
	"runtime"
	"sync"
	"sync/atomic"
	"time"
)


var ErrDisposed = errors.New("ErrDisposed");
var ErrTimeout = errors.New("ErrTimeout");
var ErrEmptyQueue = errors.New("ErrEmptyQueue");

type sema struct {
	ready    chan bool
	response *sync.WaitGroup
}

func newSema() *sema {
	return &sema{
		ready:    make(chan bool, 1),
		response: &sync.WaitGroup{},
	}
}





type waiters []*sema
// 比如 waiters = [1, 2, 3, 4], 那么get()之后, 返回1, 但是waiter = [2, 3, 4]
func (w *waiters) get() *sema {
	if len(*w) == 0 {
		return nil
	}

	sema := (*w)[0]
	copy((*w)[0:], (*w)[1:])
	(*w)[len(*w)-1] = nil // or the zero value of T
	*w = (*w)[:len(*w)-1]
	return sema
}

// 追加
func (w *waiters)put(sema *sema)  {
	*w = append(*w, sema)
}

// 删除
func (w *waiters)remove(sema *sema)  {
	if len(*w) == 0{
		return
	}

	// 创建一个新切片,将原来的所有除了sema之外的复制到新切片中
	ws := *w
	newWs := make(waiters, 0, len(*w))
	for i := range ws {
		if ws[i] != sema{
			newWs = append(newWs, ws[i])
		}
	}

	*w = newWs
}

type items []interface{}

// 比如 items = [1, 2, 3, 4, 5, 6, 7, 8]
// 如果number = 3, 那么 返回[1, 2, 3], 但是 items = [4, 5, 6, 7, 8]
// 如果number = 5, 那么 返回[1, 2, 3, 4, 5], 但是 items = [6, 7, 8]
// 如果number = 20, 那么 返回[1, 2, 3, 4, 5, 6, 7, 8], 但是 nil
func (items *items) get(number int64) []interface{} {
	returnItems := make([]interface{}, 0, number)

	// 将items中的值复制到returnItems中
	index := uint64(0) // 统计复制了多少个值
	for i := int64(0); i < number ; i++ {
		if i >= int64(len(*items)) {
			break
		}

		returnItems = append(returnItems, (*items)[i])
		(*items)[i] = nil
		index++
	}

	(*items) = (*items)[index:]
	return returnItems
}

// 获取数组的第一个元素。如果数组长度为0则返回nil,false
func (items *items) peek() (interface{}, bool) {
	length := len(*items)

	if length == 0 {
		return nil, false
	}

	return (*items)[0], true
}


/*
 对队列中的元素做check操作,直到第一个不满足check条件的元素出现则break,返回操作成功的元素:
	* 先检查items的长度,如果没有元素则直接返回nil
	* 否则遍历items,检查item是否匹配check:
		如果匹配成功,将该item追加到结果数组中,然后将队列中相应位置的item置nil
		如果不匹配,结束循环
    * 将队列中的item为nil的元素切割掉
*/
func (items *items) getUntil(checker func(item interface{}) bool) []interface{}{
	length := len(*items)

	if len(*items) == 0 {
		return []interface{}{}
	}

	returnItems := make([]interface{}, 0, length)
	index := -1
	for i, item := range *items {
		if !checker(item) {
			break
		}

		returnItems = append(returnItems, item)
		index = i
		(*items)[i] = nil // prevent memory leak
	}


	*items = (*items)[index+1:]
	return returnItems
}

type Queue struct {
	waiters  waiters
	items    items
	lock     sync.Mutex
	disposed bool  // items是否为nil
}

//Put将向队列中添加指定的项。
/*
 * 如果 len(items) == 0, 没有剩下的位置了,返回nil
 * 如果 q.disposed = true,也就是items = nil,则返回ErrDisposed
 * 否则追加items... 到q.items的后面:
		然后进入for:如果有人在等待获取队列元素,那么告诉他们已经有元素入队了
*/
func (q *Queue)Put(items ... interface{}) error  {
	if (len(items) == 0){
		return nil
	}

	q.lock.Lock()
	if(q.disposed){
		q.lock.Unlock()
		return ErrDisposed
	}

	q.items = append(q.items, items...)
	for{
		sema := q.waiters.get()
		if(sema == nil){
			break
		}

		sema.response.Add(1)
		select {
		case sema.ready <- true:
			sema.response.Wait()
		default:
			// This semaphore timed out.
		}
		if (len(items) == 0){
			break
		}
	}


	q.lock.Unlock()
	return nil
}

/*
Get:获取队列的前n项。
	如果队列长度大于0小于n,返回所有项目
	如果队列长度大于n,返回前n项
	如果队列中没有项目,则此方法阻塞等待,直到项目添加到队列。[不会有ErrTimeout]
*/
func (q *Queue) Get(number int64) ([]interface{}, error) {
	return q.Poll(number, 0)
}

/*
Poll:获取队列的前n项。
	如果队列长度大于0小于n,返回所有项目
	如果队列长度大于n,返回前n项
     如果队列中没有项目,则此方法将阻塞等待,直到项目添加到队列。如果发生超时,则返回ErrTimeout。
             timeout为0时会一直阻塞,直到获取到值
			 timeout不为0时在超时时间之前,会一直阻塞。如果发生超时,则返回ErrTimeout。
*/
func (q *Queue) Poll(number int64, timeout time.Duration) ([]interface{}, error) {
	if number < 1 {
		// thanks again go
		return []interface{}{}, nil
	}

	q.lock.Lock()

	if q.disposed { // items为nil时
		q.lock.Unlock()
		return nil, ErrDisposed // 返回 没有 位置可以 排队
	}

	var items []interface{}

	// 如果队列为空,那么在timeout之前阻塞等待,
	if len(q.items) == 0 {
		sema := newSema()
		q.waiters.put(sema)
		q.lock.Unlock()

		var timeoutC <-chan time.Time
		if timeout > 0 {
			timeoutC = time.After(timeout)
		}
		select {
		case <-sema.ready:
			// we are now inside the put's lock
			if q.disposed {
				return nil, ErrDisposed
			}
			items = q.items.get(number)
			sema.response.Done()
			return items, nil
		case <-timeoutC:
			// cleanup the sema that was added to waiters
			select {
			case sema.ready <- true: // 向sema.ready中写入true
				// 在调用Put()之前 从 waiters中删除 sema
				q.lock.Lock()
				q.waiters.remove(sema)
				q.lock.Unlock()
			default:
				// Put() got it already, we need to call Done() so Put() can move on
				sema.response.Done()
			}
			return nil, ErrTimeout
		}
	}

	// 当队列不为空时,获取前number项
	items = q.items.get(number)
	q.lock.Unlock()
	return items, nil
}
/* 获取队首元素:
     先检查items是否为nil
		为nil,返回nil和ErrDisposed
		不为nil:
			items的长度为0,返回nil,false
				返回nil, ErrEmptyQueue
			items的长度大于0,返回itemsp[0],nil
 *
*/
func (q *Queue) Peek() (interface{}, error) {
	q.lock.Lock()
	defer q.lock.Unlock()

	if q.disposed { // 如果没有位置可以放置item,也就是items为nil
		return nil, ErrDisposed
	}

	// 获取队首元素,也就是items数组的第一个元素
	// 当前,如果items数组长度为0,则返回nil,false
	peekItem, ok := q.items.peek()
	if !ok {
		return nil, ErrEmptyQueue
	}

	return peekItem, nil
}

/*
takenuntil接受一个函数并返回一个匹配检查器,直到检查器返回false。如果队列中没有项目,则不会等待。
*/
func (q *Queue) TakeUntil(checker func(item interface{}) bool) ([]interface{}, error) {
	if checker == nil {
		return nil, nil
	}

	q.lock.Lock()

	if q.disposed {
		q.lock.Unlock()
		return nil, ErrDisposed
	}

	result := q.items.getUntil(checker)
	q.lock.Unlock()
	return result, nil
}

func (q *Queue) Empty() bool {
	q.lock.Lock()
	defer q.lock.Unlock()

	return len(q.items) == 0
}

// Len returns the number of items in this queue.
func (q *Queue) Len() int64 {
	q.lock.Lock()
	defer q.lock.Unlock()

	return int64(len(q.items))
}

/*
Disposed返回一个bool,指示此队列是否
已经准备好了。
*/
func (q *Queue) Disposed() bool {
	q.lock.Lock()
	defer q.lock.Unlock()

	return q.disposed
}
/*
Dispose获取队列剩下的所有项【包括正在放入的项】,并将items队列和waiters为nil
   任何后续的Get或Put调用都将返回一个错误。【因为items队列和waiters为nil了】
*/
func (q *Queue) Dispose() []interface{} {
	q.lock.Lock()
	defer q.lock.Unlock()

	q.disposed = true
	for _, waiter := range q.waiters { // 是否需要等待
		waiter.response.Add(1)
		select {
		case waiter.ready <- true:
			// release Poll immediately
		default:
			// ignore if it's a timeout or in the get
		}
	}

	disposedItems := q.items

	q.items = nil
	q.waiters = nil

	return disposedItems
}

// New is a constructor for a new threadsafe queue.
func New(hint int64) *Queue {
	return &Queue{
		waiters:  nil,
		items:    make([]interface{}, 0, hint),
		lock:     sync.Mutex{},
		disposed: false,  // items不为nil
	}

}
/*
ExecuteInParallel将(并行)调用提供的函数队列中的每个项目,直到队列用尽。
当队列耗尽时,执行完成,所有goroutine都将被终止。这意味着队列将被释放,因此不能再次使用。
	* 先检查q是否为nil,如果为nil,直接返回
	* 获取q的items元素个数,如果元素个数为0,则直接返回
	* 上锁
	* 获取当前系统核数,用于创建并发协程数,有多少个核就创建多少个协程
	*     令items := q.items
	* 等待所有协程完成之后解锁,令q的items为nil,然后结束程序
*/
func ExecuteInParallel(q *Queue, fn func(interface{})) {
	if q == nil {
		return
	}


	// of this process
	todo, done := uint64(len(q.items)), int64(-1)
	// this is important or we might face an infinite loop
	if todo == 0 {
		return
	}

	q.lock.Lock() // so no one touches anything in the middle

	numCPU := 1
	if runtime.NumCPU() > 1 {
		numCPU = runtime.NumCPU() - 1
	}

	var wg sync.WaitGroup
	wg.Add(numCPU)
	items := q.items

	for i := 0; i < numCPU; i++ {
		go func() {
			for {
				index := atomic.AddInt64(&done, 1)
				if index >= int64(todo) {
					wg.Done()
					break
				}

				fn(items[index]) // 获取元素索引
				items[index] = 0  // 令items的元素全部变成0
			}
		}()
	}
	wg.Wait()


	q.lock.Unlock()
	q.Dispose()
}

测试:

package main

import (
	"fmt"
	"sync"
	"sync/atomic"
	"testing"
	"time"
)

func TestPut(t *testing.T) {
	q := New(10)

	q.Put(`test`)
	if q.Len() != int64(1){
		t.Fatal("q.len must = 1 , but = ", q.Len())
	}

	q.Put(`test1`)
	results, err := q.Get(3)
	if err != nil{
		t.Fatal("q.Get  has err , because ", err)
	}

	result := results[0]
	if(result != `test`){
		t.Fatal("result should = `test` , but = ", result)
	}

	if(q.Empty() != true){
		t.Fatal("q.Empty() is true, but hash ", q.Empty())
	}

}

func TestGet(t *testing.T) {
	q := New(10)

	q.Put(`test`)
	result, err := q.Get(2)
	if err != nil{
		t.Fatal("q.Get  has err , because ", err)
	}

	if q.Len() != int64(0){
		t.Fatal("q.len must = 0 , but = ", q.Len())
	}

	if len(result) != int(1){
		t.Fatal("len(result) must = 1 , but = ", len(result))
	}

	if(result[0] != `test`){
		t.Fatal("result should = `test` , but = ", result)
	}

	if q.Len() != int64(0){
		t.Fatal("q.len must = 0 , but = ", q.Len())
	}



	q.Put(`1`)
	q.Put(`2`)

	result, err = q.Get(1)
	if err != nil{
		t.Fatal("q.Get  has err , because ", err)
	}
	if len(result) != int(1){
		t.Fatal("len(result) must = 1 , but = ", len(result))
	}
	if(result[0] != `1`){
		t.Fatal("result should = `1` , but = ", result)
	}
	if q.Len() != int64(1){
		t.Fatal("q.len must = 1 , but = ", q.Len())
	}


	result, err = q.Get(2)
	if err != nil{
		t.Fatal("q.Get  has err , because ", err)
	}

	if(result[0] != `2`){
		t.Fatal("result should = `2` , but = ", result)
	}
}


func TestPoll(t *testing.T) {
	q := New(10)

	// should be able to Poll() before anything is present, without breaking future Puts
	result, err := q.Poll(1,  time.Microsecond)
	if err !=  ErrTimeout{
		t.Fatal("ErrTimeout but,  ", err)
	}

	q.Put(`test`)
	result, err = q.Poll(2, 0)
	if err != nil{
		t.Fatal("q.Get  has err , because ", err)
	}
	if len(result) != int(1){
		t.Fatal("len(result) must = 1 , but = ", len(result))
	}
	if(result[0] != `test`){
		t.Fatal("result should = `test` , but = ", result)
	}
	if q.Len() != int64(0){
		t.Fatal("q.len must = 0 , but = ", q.Len())
	}

	q.Put(`1`)
	q.Put(`2`)

	result, err = q.Poll(1, time.Millisecond)
	if err != nil{
		t.Fatal("q.Get  has err , because ", err)
	}
	if(result[0] != `1`){
		t.Fatal("result should = `1` , but = ", result)
	}
	if q.Len() != int64(1){
		t.Fatal("q.len must = 1 , but = ", q.Len())
	}


}

func TestPollNoMemoryLeak(t *testing.T) {
	q := New(0)

	if (len(q.waiters) != 0){
		t.Fatal("q.len.waiters must = 0 , but = ", q.Len())
	}


	for i := 0; i < 10; i++ {
		// Poll() should cleanup waiters after timeout
		q.Poll(1, time.Nanosecond)  // 阻塞,直到超时或者获取到值才返回
		if (len(q.waiters) != 0){
			t.Fatal("q.len.waiters must = 0 , but = ", q.Len())
		}
	}
}
func TestAddEmptyPut(t *testing.T) {
	q := New(10)

	q.Put()  // 空则直接返回

	if q.Len() != 0 {
		t.Errorf(`Expected len: %d, received: %d`, 0, q.Len())
	}
}


func TestGetNonPositiveNumber(t *testing.T) {
	q := New(10)

	q.Put(`test`)
	result, err := q.Get(0)
	if err != nil{
		t.Fatal("q.Get  has err , because ", err)
	}

	if len(result) != 0 {
		t.Errorf(`Expected len: %d, received: %d`, 0, len(result))
	}
}

func TestEmpty(t *testing.T) {
	q := New(10)

	if !q.Empty() {
		t.Errorf(`Expected empty queue.`)
	}

	q.Put(`test`)
	if q.Empty() {
		t.Errorf(`Expected non-empty queue.`)
	}
}

func TestGetEmpty(t *testing.T) {
	q := New(10)

	go func() {
		q.Put(`a`)
	}()

	result, err := q.Get(2)
	if err != nil{
		t.Fatal("q.Get  has err , because ", err)
	}
	if(result[0] != `a`){
		t.Fatal("result should = `a` , but = ", result)
	}
	if q.Len() != int64(0){
		t.Fatal("q.len must = 0 , but = ", q.Len())
	}
}

func TestMultipleGetEmpty(t *testing.T) {
	q := New(10)
	var wg sync.WaitGroup
	wg.Add(2)
	results := make([][]interface{}, 2)

	go func() {
		wg.Done()
		local, err := q.Get(1)
		if err != nil{
			t.Fatal("q.Get  has err , because ", err)
		}
		results[0] = local
		wg.Done()
	}()

	go func() {
		wg.Done()
		local, err := q.Get(1)
		if err != nil{
			t.Fatal("q.Get  has err , because ", err)
		}
		results[1] = local
		wg.Done()
	}()

	fmt.Println(results)
	wg.Wait() //[[] []]
	wg.Add(2)
	q.Put(`a`, `b`, `c`)

	// `The array should be a, b or b, a`
	wg.Wait()  //[[a] [b]]
	fmt.Println(results)

}



func TestDispose(t *testing.T) {
	// when the queue is empty
	q := New(10)
	itemsDisposed := q.Dispose()
	if len(itemsDisposed) != 0 {
		t.Errorf(`Expected len: %d, received: %d`, 0, len(itemsDisposed))
	}


	// when the queue is not empty
	q = New(10)
	q.Put(`1`)
	itemsDisposed = q.Dispose()

	expected := []interface{}{`1`}
	if itemsDisposed[0] != expected[0] {
		t.Errorf(`Expected len: %d, received: %d`, expected, itemsDisposed)
	}



	// when the queue has been disposed
	itemsDisposed = q.Dispose()
	if itemsDisposed != nil {
		t.Errorf(`Expected itemsDisposed: nil, received: %s`, itemsDisposed)
	}
}

func TestEmptyGetWithDispose(t *testing.T) {
	q := New(10)
	var wg sync.WaitGroup
	wg.Add(1)

	var err error

	go func() {
		wg.Done()
		_, err = q.Get(1) // 后执行,为什么??????
		wg.Done()
	}()

	wg.Wait()
	wg.Add(1)
	q.Dispose() // 先执行

	wg.Wait()

	if err != ErrDisposed{
		t.Fatal(err)
	}
}


func TestDisposeAfterEmptyPoll(t *testing.T) {
	q := New(10)

	// 主线程会一直阻塞,因为没有放入项,所以会超时
	_, err := q.Poll(1, time.Millisecond)
	if err != ErrTimeout{
		t.Fatal(err)
	}

	// it should not hang
	q.Dispose()  // 没有位置可以防止元素

	_, err = q.Poll(1, time.Millisecond)
	if err != ErrDisposed{
		t.Fatal(err)
	}
}


func BenchmarkQueue(b *testing.B) {
	q := New(int64(b.N))
	var wg sync.WaitGroup
	wg.Add(1)
	i := 0

	go func() {
		for {
			q.Get(1)
		//	result, err := q.Get(1)  // 放多少拿多少
/*			if err != nil{
				b.Fatal(err)
			}
			b.Log(result)*/
			i++

			if i == b.N {  // 直到放置的全部拿完,才结束当前协程
				wg.Done()
				break
			}
		}
	}()

	for i := 0; i < b.N; i++ {
		q.Put(i)
	}

	wg.Wait()
}

func BenchmarkChannel(b *testing.B) {
	ch := make(chan interface{}, 1)
	var wg sync.WaitGroup
	wg.Add(1)
	i := 0

	go func() {
		for{
			<- ch
			i++
			if i == b.N {  // 直到放置的全部拿完,才结束当前协程
				wg.Done()
				break
			}
		}
	}()

	for i := 0; i < b.N; i++ {
		ch <- `a`
	}

	wg.Wait()
}

func TestPeek(t *testing.T) {
	q := New(10)
	q.Put(`a`)
	q.Put(`b`)
	q.Put(`c`)
	peekResult, err := q.Peek()
	peekExpected := `a`
	if err != nil{
		t.Fatal("q.Get  has err , because ", err)
	}

	if q.Len() != int64(3){
		t.Fatal("q.len must = 3 , but = ", q.Len())
	}

	if peekExpected != peekResult {
		t.Errorf(`Expected len: %s, received: %s`, peekExpected, peekResult)
	}



	popResult, err := q.Get(1)
	if err != nil{
		t.Fatal("q.Get  has err , because ", err)
	}
	if peekExpected != popResult[0] {
		t.Errorf(`Expected len: %d, received: %d`, peekResult, popResult[0])
	}
	if q.Len() != int64(2){
		t.Fatal("q.len must = 2 , but = ", q.Len())
	}
}

func TestPeekOnDisposedQueue(t *testing.T) {
	q := New(10)
	q.Dispose()
	result, err := q.Peek()
	if result != nil{
		t.Fatal("当前items为nil,获取队首元素时应该结果返回nil,但是获取了 ", result)
	}

	if err != ErrDisposed{
		t.Fatal(err)
	}
}

func TestTakeUntil(t *testing.T) {
	q := New(10)
	q.Put(`a`, `b`, `c`)
	result, err := q.TakeUntil(func(item interface{}) bool {
		return item != `c`
	})

	if err != nil{
		t.Fatal("q.TakeUntil  ", err)
	}

	expected := []interface{}{`a`, `b`}
	if expected[0] != result[0] || expected[1] != result[1] {
		t.Errorf(`Expected : %s, received: %s`, expected, result)
	}
}


func TestTakeUntilThenGet(t *testing.T) {
	q := New(10)
	q.Put(`a`, `b`, `c`)
	takeItems, _ := q.TakeUntil(func(item interface{}) bool {
		return item != `c`
	})

	restItems, _ := q.Get(3)

	// 2个不满足要求的已经排除
	if takeItems[0] != `a` ||takeItems[1] != `b` {
		t.Errorf(`Expected : %s, received: %s`, []interface{}{`a`, `b`}, takeItems)
	}

	// 只剩下一个`c`
	if restItems[0] != `c` || len(restItems) != 1 {
		t.Errorf(`Expected : %s, received: %s`, []interface{}{`c`}, takeItems)
	}
}


func TestWaiters(t *testing.T) {
	s1, s2, s3, s4 := newSema(), newSema(), newSema(), newSema()

	// type waiters []*sema
	w := waiters{}
	if (len(w) != 0){
		t.Fatal("q.len(waiters) should 0 , but = ", len(w))
	}


	// append数组
	w.put(s1)

	t.Log(w) // waiters{s1}

	w.put(s2)
	w.put(s3)
	w.put(s4)

	t.Log(w) //waiters{s1, s2, s3, s4}


	// 将除了s2之外的所有元素复制到新数组,然后w = newW
	w.remove(s2)
	t.Log(w) //waiters{s1, s3, s4}

	// remove non-existing element
	w.remove(s2)
	t.Log(w) //waiters{s1, s3, s4}

	// remove from beginning
	w.remove(s1)
	t.Log(w) //waiters{s3, s4}


	// remove from end
	w.remove(s4)
	t.Log(w) //waiters{s4}


	// remove last element
	w.remove(s3)
	if (len(w) != 0){
		t.Fatal("q.len(waiters) should 0 , but = ", len(w))
	}


	// remove non-existing element
	w.remove(s3)
	if (len(w) != 0){
		t.Fatal("q.len(waiters) should 0 , but = ", len(w))
	}

	//
	// test get()
	//
	// start with 3 elements in list
	w.put(s1)
	w.put(s2)
	w.put(s3)


	// get() returns each item in insertion order
//	assert.Equal(t, s1, w.get())
//	assert.Equal(t, s2, w.get())
	w.put(s4) // interleave a put(), item should go to the end
//	assert.Equal(t, s3, w.get())
	//assert.Equal(t, s4, w.get())
	//assert.Empty(t, w)
	//assert.Nil(t, w.get())
}


func TestExecuteInParallel(t *testing.T) {
	q := New(10)
	var i int64 = 0
	for ; i < 10; i++ {
		q.Put(i)
	}

	numCalls := int64(0)

	ExecuteInParallel(q, func(item interface{}) {
//		t.Logf("ExecuteInParallel called us with %+v", item)
		atomic.AddInt64(&numCalls, item.(int64))
	})


	if q.Disposed() != true{
		t.Fatal("q.items should nil, but", q.items)
	}

	t.Log(numCalls)

	if int64(45) != numCalls{
		t.Fatal("numCalls should 10, but", numCalls)
	}
}


func TestExecuteInParallelEmptyQueue(t *testing.T) {
	q := New(1)

	// 队列为空也不会死锁
	ExecuteInParallel(q, func(interface{}) {
		t.Fail()
	})
}

func BenchmarkQueuePut(b *testing.B) {
	numItems := int64(1000)

	qs := make([]*Queue, 0, b.N)

	for i := 0; i < b.N; i++ {
		q := New(10)
		qs = append(qs, q)
	}

	b.ResetTimer()
	for i := 0; i < b.N; i++ {
		q := qs[i]
		for j := int64(0); j < numItems; j++ {
			q.Put(j)
		}
	}
}


func BenchmarkQueueGet(b *testing.B) {
	numItems := int64(1000)

	qs := make([]*Queue, 0, b.N)

	for i := 0; i < b.N; i++ {
		q := New(numItems)
		for j := int64(0); j < numItems; j++ {
			q.Put(j)
		}
		qs = append(qs, q)
	}

	b.ResetTimer()

	for i := 0; i < b.N; i++ {
		q := qs[i]
		for j := int64(0); j < numItems; j++ {
			q.Get(1)
		}
	}
}

func BenchmarkQueuePoll(b *testing.B) {
	numItems := int64(1000)

	qs := make([]*Queue, 0, b.N)

	for i := 0; i < b.N; i++ {
		q := New(numItems)
		for j := int64(0); j < numItems; j++ {
			q.Put(j)
		}
		qs = append(qs, q)
	}

	b.ResetTimer()

	for _, q := range qs {
		for j := int64(0); j < numItems; j++ {
			q.Poll(1, time.Millisecond)
		}
	}
}

func BenchmarkExecuteInParallel(b *testing.B) {
	numItems := int64(1000)

	qs := make([]*Queue, 0, b.N)

	for i := 0; i < b.N; i++ {
		q := New(numItems)
		for j := int64(0); j < numItems; j++ {
			q.Put(j)
		}
		qs = append(qs, q)
	}

	var counter int64
	fn := func(ifc interface{}) {
		c := ifc.(int64)
		atomic.AddInt64(&counter, c)
	}

	b.ResetTimer()

	for i := 0; i < b.N; i++ {
		q := qs[i]
		ExecuteInParallel(q, fn)
	}
}