之前写过一篇文章 使用Golang实现的无锁队列,性能与Disruptor相当达到1400万/秒,后来在 choleraehyq (Cholerae Hu) 的帮助下发现并修复了go test -race 测试出错的BUG, 并且性能非常理想,后来爆发了Meltdown和Spectre处理器漏洞修复的影响,性能下降50% 。

  最近花了一些时间升级了EsQueue,增加了Puts() 和 Gets() 批量操作方法,进过测试,测试结果超级理想,源代码已更新到 yireyun/go-queue。

测试结果如下:

----批量尺寸为32使用Puts()和Gets()的测试结果如下----
使用Puts, Gets 进行测试,块尺寸32, 性能可以更快, 对应 pprof4
go1.8.3 amd64, Grp:   1, Times:    1500000, miss:     0, use:    908.254ms,    605ns/32op      18/op
go1.8.3 amd64, Grp:   2, Times:    3000000, miss:     0, use:   1.4947772s,    498ns/32op      15/op
go1.8.3 amd64, Grp:   3, Times:    3000000, miss:     0, use:   1.4490451s,    483ns/32op      15/op
go1.8.3 amd64, Grp:   4, Times:    4000000, miss:     0, use:   2.1125661s,    528ns/32op      16/op
go1.8.3 amd64, Grp:   5, Times:    5000000, miss:     0, use:   2.3802556s,    476ns/32op      14/op
go1.8.3 amd64, Grp:   6, Times:    3000000, miss:     0, use:   1.5050799s,    501ns/32op      15/op
go1.8.3 amd64, Grp:   7, Times:    3500000, miss:     0, use:   1.6807146s,    480ns/32op      15/op
go1.8.3 amd64, Grp:   8, Times:    4000000, miss:     0, use:   1.8279384s,    456ns/32op      14/op
go1.8.3 amd64, Grp:   9, Times:    3600000, miss:     0, use:   1.6087893s,    446ns/32op      13/op
go1.8.3 amd64, Grp:  10, Times:    4000000, miss:     0, use:   1.8343257s,    458ns/32op      14/op
go1.8.3 amd64, Grp:  11, Times:    4400000, miss:     0, use:   1.9333989s,    439ns/32op      13/op
go1.8.3 amd64, Grp:  12, Times:    3600000, miss:     0, use:   1.5931753s,    442ns/32op      13/op
go1.8.3 amd64, Grp:  13, Times:    3900000, miss:     0, use:   1.7232328s,    441ns/32op      13/op
go1.8.3 amd64, Grp:  14, Times:    4200000, miss:     0, use:   1.8263283s,    434ns/32op      13/op
go1.8.3 amd64, Grp:  15, Times:    3000000, miss:     0, use:   1.2999684s,    433ns/32op      13/op
go1.8.3 amd64, Grp:  16, Times:    3200000, miss:     0, use:   1.3860389s,    433ns/32op      13/op
go1.8.3 amd64, Grp: Sum, Times:   56900000, miss:     0, use:  26.5638885s,    466ns/32op      14/op


----批量尺寸为32使用Put()和Gets()的测试结果如下----
使用Put, Gets 进行测试,块尺寸32, 性能提升不多, 对应 pprof5
go1.10.3 amd64, Grp:   1, Times:   48000000, miss:     0, use:   2.0869798s,     43ns/op
go1.10.3 amd64, Grp:   2, Times:   96000000, miss:     0, use:    4.253081s,     44ns/op
go1.10.3 amd64, Grp:   3, Times:   76800000, miss:     0, use:    3.448918s,     44ns/op
go1.10.3 amd64, Grp:   4, Times:  102400000, miss:     0, use:   4.5190001s,     44ns/op
go1.10.3 amd64, Grp:   5, Times:  128000000, miss:     0, use:   5.7560188s,     44ns/op
go1.10.3 amd64, Grp:   6, Times:   96000000, miss:     0, use:   4.0059824s,     41ns/op
go1.10.3 amd64, Grp:   7, Times:  112000000, miss:     0, use:   5.1509986s,     45ns/op
go1.10.3 amd64, Grp:   8, Times:  128000000, miss:     0, use:   5.5909991s,     43ns/op
go1.10.3 amd64, Grp:   9, Times:  115200000, miss:     0, use:   5.2770015s,     45ns/op
go1.10.3 amd64, Grp:  10, Times:  128000000, miss:     0, use:   5.5629135s,     43ns/op
go1.10.3 amd64, Grp:  11, Times:  140800000, miss:     0, use:   6.4389843s,     45ns/op
go1.10.3 amd64, Grp:  12, Times:  115200000, miss:     0, use:   5.1078895s,     44ns/op
go1.10.3 amd64, Grp:  13, Times:  124800000, miss:     0, use:   5.7341286s,     45ns/op
go1.10.3 amd64, Grp:  14, Times:  134400000, miss:     0, use:   5.8718558s,     43ns/op
go1.10.3 amd64, Grp:  15, Times:   96000000, miss:     0, use:   4.4031446s,     45ns/op
go1.10.3 amd64, Grp:  16, Times:  102400000, miss:     0, use:   4.4728723s,     43ns/op
go1.10.3 amd64, Grp: Sum, Times: 1744000000, miss:     0, use: 1m17.6807679s,     44ns/op


源代码如下:

// esQueue
package queue

import (
	"fmt"
	"runtime"
	"sync/atomic"
)

type esCache struct {
	putNo uint32
	getNo uint32
	value interface{}
}

// lock free queue
type EsQueue struct {
	capaciity uint32
	capMod    uint32
	putPos    uint32
	getPos    uint32
	cache     []esCache
}

func NewQueue(capaciity uint32) *EsQueue {
	q := new(EsQueue)
	q.capaciity = minQuantity(capaciity)
	q.capMod = q.capaciity - 1
	q.putPos = 0
	q.getPos = 0
	q.cache = make([]esCache, q.capaciity)
	for i := range q.cache {
		cache := &q.cache[i]
		cache.getNo = uint32(i)
		cache.putNo = uint32(i)
	}
	cache := &q.cache[0]
	cache.getNo = q.capaciity
	cache.putNo = q.capaciity
	return q
}

func (q *EsQueue) String() string {
	getPos := atomic.LoadUint32(&q.getPos)
	putPos := atomic.LoadUint32(&q.putPos)
	return fmt.Sprintf("Queue{capaciity: %v, capMod: %v, putPos: %v, getPos: %v}",
		q.capaciity, q.capMod, putPos, getPos)
}

func (q *EsQueue) Capaciity() uint32 {
	return q.capaciity
}

func (q *EsQueue) Quantity() uint32 {
	var putPos, getPos uint32
	var quantity uint32
	getPos = atomic.LoadUint32(&q.getPos)
	putPos = atomic.LoadUint32(&q.putPos)

	if putPos >= getPos {
		quantity = putPos - getPos
	} else {
		quantity = q.capMod + (putPos - getPos)
	}

	return quantity
}

// put queue functions
func (q *EsQueue) Put(val interface{}) (ok bool, quantity uint32) {
	var putPos, putPosNew, getPos, posCnt uint32
	var cache *esCache
	capMod := q.capMod

	getPos = atomic.LoadUint32(&q.getPos)
	putPos = atomic.LoadUint32(&q.putPos)

	if putPos >= getPos {
		posCnt = putPos - getPos
	} else {
		posCnt = capMod + (putPos - getPos)
	}

	if posCnt >= capMod-1 {
		runtime.Gosched()
		return false, posCnt
	}

	putPosNew = putPos + 1
	if !atomic.CompareAndSwapUint32(&q.putPos, putPos, putPosNew) {
		runtime.Gosched()
		return false, posCnt
	}

	cache = &q.cache[putPosNew&capMod]

	for {
		getNo := atomic.LoadUint32(&cache.getNo)
		putNo := atomic.LoadUint32(&cache.putNo)
		if putPosNew == putNo && getNo == putNo {
			cache.value = val
			atomic.AddUint32(&cache.putNo, q.capaciity)
			return true, posCnt + 1
		} else {
			runtime.Gosched()
		}
	}
}

// puts queue functions
func (q *EsQueue) Puts(values []interface{}) (puts, quantity uint32) {
	var putPos, putPosNew, getPos, posCnt, putCnt uint32
	capMod := q.capMod

	getPos = atomic.LoadUint32(&q.getPos)
	putPos = atomic.LoadUint32(&q.putPos)

	if putPos >= getPos {
		posCnt = putPos - getPos
	} else {
		posCnt = capMod + (putPos - getPos)
	}

	if posCnt >= capMod-1 {
		runtime.Gosched()
		return 0, posCnt
	}

	if capPuts, size := q.capaciity-posCnt, uint32(len(values)); capPuts >= size {
		putCnt = size
	} else {
		putCnt = capPuts
	}
	putPosNew = putPos + putCnt

	if !atomic.CompareAndSwapUint32(&q.putPos, putPos, putPosNew) {
		runtime.Gosched()
		return 0, posCnt
	}

	for posNew, v := putPos+1, uint32(0); v < putCnt; posNew, v = posNew+1, v+1 {
		var cache *esCache = &q.cache[posNew&capMod]
		for {
			getNo := atomic.LoadUint32(&cache.getNo)
			putNo := atomic.LoadUint32(&cache.putNo)
			if posNew == putNo && getNo == putNo {
				cache.value = values[v]
				atomic.AddUint32(&cache.putNo, q.capaciity)
				break
			} else {
				runtime.Gosched()
			}
		}
	}
	return putCnt, posCnt + putCnt
}

// get queue functions
func (q *EsQueue) Get() (val interface{}, ok bool, quantity uint32) {
	var putPos, getPos, getPosNew, posCnt uint32
	var cache *esCache
	capMod := q.capMod

	putPos = atomic.LoadUint32(&q.putPos)
	getPos = atomic.LoadUint32(&q.getPos)

	if putPos >= getPos {
		posCnt = putPos - getPos
	} else {
		posCnt = capMod + (putPos - getPos)
	}

	if posCnt < 1 {
		runtime.Gosched()
		return nil, false, posCnt
	}

	getPosNew = getPos + 1
	if !atomic.CompareAndSwapUint32(&q.getPos, getPos, getPosNew) {
		runtime.Gosched()
		return nil, false, posCnt
	}

	cache = &q.cache[getPosNew&capMod]

	for {
		getNo := atomic.LoadUint32(&cache.getNo)
		putNo := atomic.LoadUint32(&cache.putNo)
		if getPosNew == getNo && getNo == putNo-q.capaciity {
			val = cache.value
			cache.value = nil
			atomic.AddUint32(&cache.getNo, q.capaciity)
			return val, true, posCnt - 1
		} else {
			runtime.Gosched()
		}
	}
}

// gets queue functions
func (q *EsQueue) Gets(values []interface{}) (gets, quantity uint32) {
	var putPos, getPos, getPosNew, posCnt, getCnt uint32
	capMod := q.capMod

	putPos = atomic.LoadUint32(&q.putPos)
	getPos = atomic.LoadUint32(&q.getPos)

	if putPos >= getPos {
		posCnt = putPos - getPos
	} else {
		posCnt = capMod + (putPos - getPos)
	}

	if posCnt < 1 {
		runtime.Gosched()
		return 0, posCnt
	}

	if size := uint32(len(values)); posCnt >= size {
		getCnt = size
	} else {
		getCnt = posCnt
	}
	getPosNew = getPos + getCnt

	if !atomic.CompareAndSwapUint32(&q.getPos, getPos, getPosNew) {
		runtime.Gosched()
		return 0, posCnt
	}

	for posNew, v := getPos+1, uint32(0); v < getCnt; posNew, v = posNew+1, v+1 {
		var cache *esCache = &q.cache[posNew&capMod]
		for {
			getNo := atomic.LoadUint32(&cache.getNo)
			putNo := atomic.LoadUint32(&cache.putNo)
			if posNew == getNo && getNo == putNo-q.capaciity {
				values[v] = cache.value
				cache.value = nil
				getNo = atomic.AddUint32(&cache.getNo, q.capaciity)
				break
			} else {
				runtime.Gosched()
			}
		}
	}

	return getCnt, posCnt - getCnt
}

// round 到最近的2的倍数
func minQuantity(v uint32) uint32 {
	v--
	v |= v >> 1
	v |= v >> 2
	v |= v >> 4
	v |= v >> 8
	v |= v >> 16
	v++
	return v
}