基于切片的单向队列简单实现
- 单向队列只允许一端进一端出
- 双端队列两端都进和出
package main
import (
"fmt"
"sync"
)
type QueueInterface interface {
Len() int
Empty() bool
Clear()
Push(data interface{})
Pop() interface{}
Peep() interface{}
}
type Queue struct {
data []interface{}
len int
lock *sync.Mutex
}
func NewQueue() *Queue {
return &Queue{
data: make([]interface{}, 0),
len: 0,
lock: new(sync.Mutex),
}
}
func (q *Queue) Clear() {
q.lock.Lock()
defer q.lock.Unlock()
if q.len == 0 {
return
}
q.data = make([]interface{}, 0)
q.len = 0;
}
func (q *Queue) Len() int {
q.lock.Lock()
defer q.lock.Unlock()
return q.len
}
func (q *Queue) Empty() bool {
q.lock.Lock()
defer q.lock.Unlock()
return q.len == 0
}
func (q *Queue)Push(data interface{}) {
if data == nil {
return
}
q.lock.Lock()
defer q.lock.Unlock()
q.data = append(q.data, data)
q.len++;
}
func (q *Queue)Pop() interface{} {
q.lock.Lock()
defer q.lock.Unlock()
if q.len == 0 {
return nil
}
var ele interface{} = nil
ele, q.data = q.data[0], q.data[1:]
q.len--
return ele
}
func (q *Queue)Peep() interface{} {
q.lock.Lock()
defer q.lock.Unlock()
if q.len == 0 {
return nil
}
return q.data[0]
}
func main() {
q := NewQueue()
if !q.Empty() || q.len != 0 || q.Len() != 0 {
println("eerr");
return
}
q.Push(1)
q.Push(2)
q.Push(3)
fmt.Println(q.data)
q.Pop()
fmt.Println(q.data)
q.Pop()
fmt.Println(q.data)
q.Pop()
fmt.Println(q.data)
q.Pop()
fmt.Println(q.data)
q.Pop()
fmt.Println(q.data)
}
使用链表简单的实现
这个例子用Go语言的包"container/list"实现一个线程安全访问的队列
package main
import (
"container/list"
"fmt"
"sync"
)
type Queue struct {
data *list.List
sync.Mutex
}
func NewQueue() *Queue {
q := new(Queue)
q.data = list.New()
return q
}
func (q *Queue) Push(v interface{}) {
q.Lock()
defer q.Unlock()
q.data.PushFront(v)
}
func (q *Queue) Pop() interface{} {
q.Lock()
defer q.Unlock()
iter := q.data.Back()
v := iter.Value
q.data.Remove(iter)
return v
}
func (q *Queue) Dump() {
for iter := q.data.Back(); iter != nil; iter = iter.Prev() {
fmt.Println("item:", iter.Value)
}
}
func (q *Queue) Empty() bool {
return q.data.Len() == 0
}
func (q *Queue) Size() int {
return q.data.Len()
}
func main() {
q := NewQueue()
q.Push("foo")
q.Push("bar")
q.Push("baz")
q.Dump()
}
基于切片的单向队列复杂实现
package main
import (
"errors"
"runtime"
"sync"
"sync/atomic"
"time"
)
var ErrDisposed = errors.New("ErrDisposed");
var ErrTimeout = errors.New("ErrTimeout");
var ErrEmptyQueue = errors.New("ErrEmptyQueue");
type sema struct {
ready chan bool
response *sync.WaitGroup
}
func newSema() *sema {
return &sema{
ready: make(chan bool, 1),
response: &sync.WaitGroup{},
}
}
type waiters []*sema
// 比如 waiters = [1, 2, 3, 4], 那么get()之后, 返回1, 但是waiter = [2, 3, 4]
func (w *waiters) get() *sema {
if len(*w) == 0 {
return nil
}
sema := (*w)[0]
copy((*w)[0:], (*w)[1:])
(*w)[len(*w)-1] = nil // or the zero value of T
*w = (*w)[:len(*w)-1]
return sema
}
// 追加
func (w *waiters)put(sema *sema) {
*w = append(*w, sema)
}
// 删除
func (w *waiters)remove(sema *sema) {
if len(*w) == 0{
return
}
// 创建一个新切片,将原来的所有除了sema之外的复制到新切片中
ws := *w
newWs := make(waiters, 0, len(*w))
for i := range ws {
if ws[i] != sema{
newWs = append(newWs, ws[i])
}
}
*w = newWs
}
type items []interface{}
// 比如 items = [1, 2, 3, 4, 5, 6, 7, 8]
// 如果number = 3, 那么 返回[1, 2, 3], 但是 items = [4, 5, 6, 7, 8]
// 如果number = 5, 那么 返回[1, 2, 3, 4, 5], 但是 items = [6, 7, 8]
// 如果number = 20, 那么 返回[1, 2, 3, 4, 5, 6, 7, 8], 但是 nil
func (items *items) get(number int64) []interface{} {
returnItems := make([]interface{}, 0, number)
// 将items中的值复制到returnItems中
index := uint64(0) // 统计复制了多少个值
for i := int64(0); i < number ; i++ {
if i >= int64(len(*items)) {
break
}
returnItems = append(returnItems, (*items)[i])
(*items)[i] = nil
index++
}
(*items) = (*items)[index:]
return returnItems
}
// 获取数组的第一个元素。如果数组长度为0则返回nil,false
func (items *items) peek() (interface{}, bool) {
length := len(*items)
if length == 0 {
return nil, false
}
return (*items)[0], true
}
/*
对队列中的元素做check操作,直到第一个不满足check条件的元素出现则break,返回操作成功的元素:
* 先检查items的长度,如果没有元素则直接返回nil
* 否则遍历items,检查item是否匹配check:
如果匹配成功,将该item追加到结果数组中,然后将队列中相应位置的item置nil
如果不匹配,结束循环
* 将队列中的item为nil的元素切割掉
*/
func (items *items) getUntil(checker func(item interface{}) bool) []interface{}{
length := len(*items)
if len(*items) == 0 {
return []interface{}{}
}
returnItems := make([]interface{}, 0, length)
index := -1
for i, item := range *items {
if !checker(item) {
break
}
returnItems = append(returnItems, item)
index = i
(*items)[i] = nil // prevent memory leak
}
*items = (*items)[index+1:]
return returnItems
}
type Queue struct {
waiters waiters
items items
lock sync.Mutex
disposed bool // items是否为nil
}
//Put将向队列中添加指定的项。
/*
* 如果 len(items) == 0, 没有剩下的位置了,返回nil
* 如果 q.disposed = true,也就是items = nil,则返回ErrDisposed
* 否则追加items... 到q.items的后面:
然后进入for:如果有人在等待获取队列元素,那么告诉他们已经有元素入队了
*/
func (q *Queue)Put(items ... interface{}) error {
if (len(items) == 0){
return nil
}
q.lock.Lock()
if(q.disposed){
q.lock.Unlock()
return ErrDisposed
}
q.items = append(q.items, items...)
for{
sema := q.waiters.get()
if(sema == nil){
break
}
sema.response.Add(1)
select {
case sema.ready <- true:
sema.response.Wait()
default:
// This semaphore timed out.
}
if (len(items) == 0){
break
}
}
q.lock.Unlock()
return nil
}
/*
Get:获取队列的前n项。
如果队列长度大于0小于n,返回所有项目
如果队列长度大于n,返回前n项
如果队列中没有项目,则此方法阻塞等待,直到项目添加到队列。[不会有ErrTimeout]
*/
func (q *Queue) Get(number int64) ([]interface{}, error) {
return q.Poll(number, 0)
}
/*
Poll:获取队列的前n项。
如果队列长度大于0小于n,返回所有项目
如果队列长度大于n,返回前n项
如果队列中没有项目,则此方法将阻塞等待,直到项目添加到队列。如果发生超时,则返回ErrTimeout。
timeout为0时会一直阻塞,直到获取到值
timeout不为0时在超时时间之前,会一直阻塞。如果发生超时,则返回ErrTimeout。
*/
func (q *Queue) Poll(number int64, timeout time.Duration) ([]interface{}, error) {
if number < 1 {
// thanks again go
return []interface{}{}, nil
}
q.lock.Lock()
if q.disposed { // items为nil时
q.lock.Unlock()
return nil, ErrDisposed // 返回 没有 位置可以 排队
}
var items []interface{}
// 如果队列为空,那么在timeout之前阻塞等待,
if len(q.items) == 0 {
sema := newSema()
q.waiters.put(sema)
q.lock.Unlock()
var timeoutC <-chan time.Time
if timeout > 0 {
timeoutC = time.After(timeout)
}
select {
case <-sema.ready:
// we are now inside the put's lock
if q.disposed {
return nil, ErrDisposed
}
items = q.items.get(number)
sema.response.Done()
return items, nil
case <-timeoutC:
// cleanup the sema that was added to waiters
select {
case sema.ready <- true: // 向sema.ready中写入true
// 在调用Put()之前 从 waiters中删除 sema
q.lock.Lock()
q.waiters.remove(sema)
q.lock.Unlock()
default:
// Put() got it already, we need to call Done() so Put() can move on
sema.response.Done()
}
return nil, ErrTimeout
}
}
// 当队列不为空时,获取前number项
items = q.items.get(number)
q.lock.Unlock()
return items, nil
}
/* 获取队首元素:
先检查items是否为nil
为nil,返回nil和ErrDisposed
不为nil:
items的长度为0,返回nil,false
返回nil, ErrEmptyQueue
items的长度大于0,返回itemsp[0],nil
*
*/
func (q *Queue) Peek() (interface{}, error) {
q.lock.Lock()
defer q.lock.Unlock()
if q.disposed { // 如果没有位置可以放置item,也就是items为nil
return nil, ErrDisposed
}
// 获取队首元素,也就是items数组的第一个元素
// 当前,如果items数组长度为0,则返回nil,false
peekItem, ok := q.items.peek()
if !ok {
return nil, ErrEmptyQueue
}
return peekItem, nil
}
/*
takenuntil接受一个函数并返回一个匹配检查器,直到检查器返回false。如果队列中没有项目,则不会等待。
*/
func (q *Queue) TakeUntil(checker func(item interface{}) bool) ([]interface{}, error) {
if checker == nil {
return nil, nil
}
q.lock.Lock()
if q.disposed {
q.lock.Unlock()
return nil, ErrDisposed
}
result := q.items.getUntil(checker)
q.lock.Unlock()
return result, nil
}
func (q *Queue) Empty() bool {
q.lock.Lock()
defer q.lock.Unlock()
return len(q.items) == 0
}
// Len returns the number of items in this queue.
func (q *Queue) Len() int64 {
q.lock.Lock()
defer q.lock.Unlock()
return int64(len(q.items))
}
/*
Disposed返回一个bool,指示此队列是否
已经准备好了。
*/
func (q *Queue) Disposed() bool {
q.lock.Lock()
defer q.lock.Unlock()
return q.disposed
}
/*
Dispose获取队列剩下的所有项【包括正在放入的项】,并将items队列和waiters为nil
任何后续的Get或Put调用都将返回一个错误。【因为items队列和waiters为nil了】
*/
func (q *Queue) Dispose() []interface{} {
q.lock.Lock()
defer q.lock.Unlock()
q.disposed = true
for _, waiter := range q.waiters { // 是否需要等待
waiter.response.Add(1)
select {
case waiter.ready <- true:
// release Poll immediately
default:
// ignore if it's a timeout or in the get
}
}
disposedItems := q.items
q.items = nil
q.waiters = nil
return disposedItems
}
// New is a constructor for a new threadsafe queue.
func New(hint int64) *Queue {
return &Queue{
waiters: nil,
items: make([]interface{}, 0, hint),
lock: sync.Mutex{},
disposed: false, // items不为nil
}
}
/*
ExecuteInParallel将(并行)调用提供的函数队列中的每个项目,直到队列用尽。
当队列耗尽时,执行完成,所有goroutine都将被终止。这意味着队列将被释放,因此不能再次使用。
* 先检查q是否为nil,如果为nil,直接返回
* 获取q的items元素个数,如果元素个数为0,则直接返回
* 上锁
* 获取当前系统核数,用于创建并发协程数,有多少个核就创建多少个协程
* 令items := q.items
* 等待所有协程完成之后解锁,令q的items为nil,然后结束程序
*/
func ExecuteInParallel(q *Queue, fn func(interface{})) {
if q == nil {
return
}
// of this process
todo, done := uint64(len(q.items)), int64(-1)
// this is important or we might face an infinite loop
if todo == 0 {
return
}
q.lock.Lock() // so no one touches anything in the middle
numCPU := 1
if runtime.NumCPU() > 1 {
numCPU = runtime.NumCPU() - 1
}
var wg sync.WaitGroup
wg.Add(numCPU)
items := q.items
for i := 0; i < numCPU; i++ {
go func() {
for {
index := atomic.AddInt64(&done, 1)
if index >= int64(todo) {
wg.Done()
break
}
fn(items[index]) // 获取元素索引
items[index] = 0 // 令items的元素全部变成0
}
}()
}
wg.Wait()
q.lock.Unlock()
q.Dispose()
}
测试:
package main
import (
"fmt"
"sync"
"sync/atomic"
"testing"
"time"
)
func TestPut(t *testing.T) {
q := New(10)
q.Put(`test`)
if q.Len() != int64(1){
t.Fatal("q.len must = 1 , but = ", q.Len())
}
q.Put(`test1`)
results, err := q.Get(3)
if err != nil{
t.Fatal("q.Get has err , because ", err)
}
result := results[0]
if(result != `test`){
t.Fatal("result should = `test` , but = ", result)
}
if(q.Empty() != true){
t.Fatal("q.Empty() is true, but hash ", q.Empty())
}
}
func TestGet(t *testing.T) {
q := New(10)
q.Put(`test`)
result, err := q.Get(2)
if err != nil{
t.Fatal("q.Get has err , because ", err)
}
if q.Len() != int64(0){
t.Fatal("q.len must = 0 , but = ", q.Len())
}
if len(result) != int(1){
t.Fatal("len(result) must = 1 , but = ", len(result))
}
if(result[0] != `test`){
t.Fatal("result should = `test` , but = ", result)
}
if q.Len() != int64(0){
t.Fatal("q.len must = 0 , but = ", q.Len())
}
q.Put(`1`)
q.Put(`2`)
result, err = q.Get(1)
if err != nil{
t.Fatal("q.Get has err , because ", err)
}
if len(result) != int(1){
t.Fatal("len(result) must = 1 , but = ", len(result))
}
if(result[0] != `1`){
t.Fatal("result should = `1` , but = ", result)
}
if q.Len() != int64(1){
t.Fatal("q.len must = 1 , but = ", q.Len())
}
result, err = q.Get(2)
if err != nil{
t.Fatal("q.Get has err , because ", err)
}
if(result[0] != `2`){
t.Fatal("result should = `2` , but = ", result)
}
}
func TestPoll(t *testing.T) {
q := New(10)
// should be able to Poll() before anything is present, without breaking future Puts
result, err := q.Poll(1, time.Microsecond)
if err != ErrTimeout{
t.Fatal("ErrTimeout but, ", err)
}
q.Put(`test`)
result, err = q.Poll(2, 0)
if err != nil{
t.Fatal("q.Get has err , because ", err)
}
if len(result) != int(1){
t.Fatal("len(result) must = 1 , but = ", len(result))
}
if(result[0] != `test`){
t.Fatal("result should = `test` , but = ", result)
}
if q.Len() != int64(0){
t.Fatal("q.len must = 0 , but = ", q.Len())
}
q.Put(`1`)
q.Put(`2`)
result, err = q.Poll(1, time.Millisecond)
if err != nil{
t.Fatal("q.Get has err , because ", err)
}
if(result[0] != `1`){
t.Fatal("result should = `1` , but = ", result)
}
if q.Len() != int64(1){
t.Fatal("q.len must = 1 , but = ", q.Len())
}
}
func TestPollNoMemoryLeak(t *testing.T) {
q := New(0)
if (len(q.waiters) != 0){
t.Fatal("q.len.waiters must = 0 , but = ", q.Len())
}
for i := 0; i < 10; i++ {
// Poll() should cleanup waiters after timeout
q.Poll(1, time.Nanosecond) // 阻塞,直到超时或者获取到值才返回
if (len(q.waiters) != 0){
t.Fatal("q.len.waiters must = 0 , but = ", q.Len())
}
}
}
func TestAddEmptyPut(t *testing.T) {
q := New(10)
q.Put() // 空则直接返回
if q.Len() != 0 {
t.Errorf(`Expected len: %d, received: %d`, 0, q.Len())
}
}
func TestGetNonPositiveNumber(t *testing.T) {
q := New(10)
q.Put(`test`)
result, err := q.Get(0)
if err != nil{
t.Fatal("q.Get has err , because ", err)
}
if len(result) != 0 {
t.Errorf(`Expected len: %d, received: %d`, 0, len(result))
}
}
func TestEmpty(t *testing.T) {
q := New(10)
if !q.Empty() {
t.Errorf(`Expected empty queue.`)
}
q.Put(`test`)
if q.Empty() {
t.Errorf(`Expected non-empty queue.`)
}
}
func TestGetEmpty(t *testing.T) {
q := New(10)
go func() {
q.Put(`a`)
}()
result, err := q.Get(2)
if err != nil{
t.Fatal("q.Get has err , because ", err)
}
if(result[0] != `a`){
t.Fatal("result should = `a` , but = ", result)
}
if q.Len() != int64(0){
t.Fatal("q.len must = 0 , but = ", q.Len())
}
}
func TestMultipleGetEmpty(t *testing.T) {
q := New(10)
var wg sync.WaitGroup
wg.Add(2)
results := make([][]interface{}, 2)
go func() {
wg.Done()
local, err := q.Get(1)
if err != nil{
t.Fatal("q.Get has err , because ", err)
}
results[0] = local
wg.Done()
}()
go func() {
wg.Done()
local, err := q.Get(1)
if err != nil{
t.Fatal("q.Get has err , because ", err)
}
results[1] = local
wg.Done()
}()
fmt.Println(results)
wg.Wait() //[[] []]
wg.Add(2)
q.Put(`a`, `b`, `c`)
// `The array should be a, b or b, a`
wg.Wait() //[[a] [b]]
fmt.Println(results)
}
func TestDispose(t *testing.T) {
// when the queue is empty
q := New(10)
itemsDisposed := q.Dispose()
if len(itemsDisposed) != 0 {
t.Errorf(`Expected len: %d, received: %d`, 0, len(itemsDisposed))
}
// when the queue is not empty
q = New(10)
q.Put(`1`)
itemsDisposed = q.Dispose()
expected := []interface{}{`1`}
if itemsDisposed[0] != expected[0] {
t.Errorf(`Expected len: %d, received: %d`, expected, itemsDisposed)
}
// when the queue has been disposed
itemsDisposed = q.Dispose()
if itemsDisposed != nil {
t.Errorf(`Expected itemsDisposed: nil, received: %s`, itemsDisposed)
}
}
func TestEmptyGetWithDispose(t *testing.T) {
q := New(10)
var wg sync.WaitGroup
wg.Add(1)
var err error
go func() {
wg.Done()
_, err = q.Get(1) // 后执行,为什么??????
wg.Done()
}()
wg.Wait()
wg.Add(1)
q.Dispose() // 先执行
wg.Wait()
if err != ErrDisposed{
t.Fatal(err)
}
}
func TestDisposeAfterEmptyPoll(t *testing.T) {
q := New(10)
// 主线程会一直阻塞,因为没有放入项,所以会超时
_, err := q.Poll(1, time.Millisecond)
if err != ErrTimeout{
t.Fatal(err)
}
// it should not hang
q.Dispose() // 没有位置可以防止元素
_, err = q.Poll(1, time.Millisecond)
if err != ErrDisposed{
t.Fatal(err)
}
}
func BenchmarkQueue(b *testing.B) {
q := New(int64(b.N))
var wg sync.WaitGroup
wg.Add(1)
i := 0
go func() {
for {
q.Get(1)
// result, err := q.Get(1) // 放多少拿多少
/* if err != nil{
b.Fatal(err)
}
b.Log(result)*/
i++
if i == b.N { // 直到放置的全部拿完,才结束当前协程
wg.Done()
break
}
}
}()
for i := 0; i < b.N; i++ {
q.Put(i)
}
wg.Wait()
}
func BenchmarkChannel(b *testing.B) {
ch := make(chan interface{}, 1)
var wg sync.WaitGroup
wg.Add(1)
i := 0
go func() {
for{
<- ch
i++
if i == b.N { // 直到放置的全部拿完,才结束当前协程
wg.Done()
break
}
}
}()
for i := 0; i < b.N; i++ {
ch <- `a`
}
wg.Wait()
}
func TestPeek(t *testing.T) {
q := New(10)
q.Put(`a`)
q.Put(`b`)
q.Put(`c`)
peekResult, err := q.Peek()
peekExpected := `a`
if err != nil{
t.Fatal("q.Get has err , because ", err)
}
if q.Len() != int64(3){
t.Fatal("q.len must = 3 , but = ", q.Len())
}
if peekExpected != peekResult {
t.Errorf(`Expected len: %s, received: %s`, peekExpected, peekResult)
}
popResult, err := q.Get(1)
if err != nil{
t.Fatal("q.Get has err , because ", err)
}
if peekExpected != popResult[0] {
t.Errorf(`Expected len: %d, received: %d`, peekResult, popResult[0])
}
if q.Len() != int64(2){
t.Fatal("q.len must = 2 , but = ", q.Len())
}
}
func TestPeekOnDisposedQueue(t *testing.T) {
q := New(10)
q.Dispose()
result, err := q.Peek()
if result != nil{
t.Fatal("当前items为nil,获取队首元素时应该结果返回nil,但是获取了 ", result)
}
if err != ErrDisposed{
t.Fatal(err)
}
}
func TestTakeUntil(t *testing.T) {
q := New(10)
q.Put(`a`, `b`, `c`)
result, err := q.TakeUntil(func(item interface{}) bool {
return item != `c`
})
if err != nil{
t.Fatal("q.TakeUntil ", err)
}
expected := []interface{}{`a`, `b`}
if expected[0] != result[0] || expected[1] != result[1] {
t.Errorf(`Expected : %s, received: %s`, expected, result)
}
}
func TestTakeUntilThenGet(t *testing.T) {
q := New(10)
q.Put(`a`, `b`, `c`)
takeItems, _ := q.TakeUntil(func(item interface{}) bool {
return item != `c`
})
restItems, _ := q.Get(3)
// 2个不满足要求的已经排除
if takeItems[0] != `a` ||takeItems[1] != `b` {
t.Errorf(`Expected : %s, received: %s`, []interface{}{`a`, `b`}, takeItems)
}
// 只剩下一个`c`
if restItems[0] != `c` || len(restItems) != 1 {
t.Errorf(`Expected : %s, received: %s`, []interface{}{`c`}, takeItems)
}
}
func TestWaiters(t *testing.T) {
s1, s2, s3, s4 := newSema(), newSema(), newSema(), newSema()
// type waiters []*sema
w := waiters{}
if (len(w) != 0){
t.Fatal("q.len(waiters) should 0 , but = ", len(w))
}
// append数组
w.put(s1)
t.Log(w) // waiters{s1}
w.put(s2)
w.put(s3)
w.put(s4)
t.Log(w) //waiters{s1, s2, s3, s4}
// 将除了s2之外的所有元素复制到新数组,然后w = newW
w.remove(s2)
t.Log(w) //waiters{s1, s3, s4}
// remove non-existing element
w.remove(s2)
t.Log(w) //waiters{s1, s3, s4}
// remove from beginning
w.remove(s1)
t.Log(w) //waiters{s3, s4}
// remove from end
w.remove(s4)
t.Log(w) //waiters{s4}
// remove last element
w.remove(s3)
if (len(w) != 0){
t.Fatal("q.len(waiters) should 0 , but = ", len(w))
}
// remove non-existing element
w.remove(s3)
if (len(w) != 0){
t.Fatal("q.len(waiters) should 0 , but = ", len(w))
}
//
// test get()
//
// start with 3 elements in list
w.put(s1)
w.put(s2)
w.put(s3)
// get() returns each item in insertion order
// assert.Equal(t, s1, w.get())
// assert.Equal(t, s2, w.get())
w.put(s4) // interleave a put(), item should go to the end
// assert.Equal(t, s3, w.get())
//assert.Equal(t, s4, w.get())
//assert.Empty(t, w)
//assert.Nil(t, w.get())
}
func TestExecuteInParallel(t *testing.T) {
q := New(10)
var i int64 = 0
for ; i < 10; i++ {
q.Put(i)
}
numCalls := int64(0)
ExecuteInParallel(q, func(item interface{}) {
// t.Logf("ExecuteInParallel called us with %+v", item)
atomic.AddInt64(&numCalls, item.(int64))
})
if q.Disposed() != true{
t.Fatal("q.items should nil, but", q.items)
}
t.Log(numCalls)
if int64(45) != numCalls{
t.Fatal("numCalls should 10, but", numCalls)
}
}
func TestExecuteInParallelEmptyQueue(t *testing.T) {
q := New(1)
// 队列为空也不会死锁
ExecuteInParallel(q, func(interface{}) {
t.Fail()
})
}
func BenchmarkQueuePut(b *testing.B) {
numItems := int64(1000)
qs := make([]*Queue, 0, b.N)
for i := 0; i < b.N; i++ {
q := New(10)
qs = append(qs, q)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
q := qs[i]
for j := int64(0); j < numItems; j++ {
q.Put(j)
}
}
}
func BenchmarkQueueGet(b *testing.B) {
numItems := int64(1000)
qs := make([]*Queue, 0, b.N)
for i := 0; i < b.N; i++ {
q := New(numItems)
for j := int64(0); j < numItems; j++ {
q.Put(j)
}
qs = append(qs, q)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
q := qs[i]
for j := int64(0); j < numItems; j++ {
q.Get(1)
}
}
}
func BenchmarkQueuePoll(b *testing.B) {
numItems := int64(1000)
qs := make([]*Queue, 0, b.N)
for i := 0; i < b.N; i++ {
q := New(numItems)
for j := int64(0); j < numItems; j++ {
q.Put(j)
}
qs = append(qs, q)
}
b.ResetTimer()
for _, q := range qs {
for j := int64(0); j < numItems; j++ {
q.Poll(1, time.Millisecond)
}
}
}
func BenchmarkExecuteInParallel(b *testing.B) {
numItems := int64(1000)
qs := make([]*Queue, 0, b.N)
for i := 0; i < b.N; i++ {
q := New(numItems)
for j := int64(0); j < numItems; j++ {
q.Put(j)
}
qs = append(qs, q)
}
var counter int64
fn := func(ifc interface{}) {
c := ifc.(int64)
atomic.AddInt64(&counter, c)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
q := qs[i]
ExecuteInParallel(q, fn)
}
}