之前写过一篇文章 使用Golang实现的无锁队列,性能与Disruptor相当达到1400万/秒,后来在 choleraehyq (Cholerae Hu) 的帮助下发现并修复了go test -race 测试出错的BUG, 并且性能非常理想,后来爆发了Meltdown和Spectre处理器漏洞修复的影响,性能下降50% 。
最近花了一些时间升级了EsQueue,增加了Puts() 和 Gets() 批量操作方法,进过测试,测试结果超级理想,源代码已更新到 yireyun/go-queue。
测试结果如下:
----批量尺寸为32使用Puts()和Gets()的测试结果如下----
使用Puts, Gets 进行测试,块尺寸32, 性能可以更快, 对应 pprof4
go1.8.3 amd64, Grp: 1, Times: 1500000, miss: 0, use: 908.254ms, 605ns/32op 18/op
go1.8.3 amd64, Grp: 2, Times: 3000000, miss: 0, use: 1.4947772s, 498ns/32op 15/op
go1.8.3 amd64, Grp: 3, Times: 3000000, miss: 0, use: 1.4490451s, 483ns/32op 15/op
go1.8.3 amd64, Grp: 4, Times: 4000000, miss: 0, use: 2.1125661s, 528ns/32op 16/op
go1.8.3 amd64, Grp: 5, Times: 5000000, miss: 0, use: 2.3802556s, 476ns/32op 14/op
go1.8.3 amd64, Grp: 6, Times: 3000000, miss: 0, use: 1.5050799s, 501ns/32op 15/op
go1.8.3 amd64, Grp: 7, Times: 3500000, miss: 0, use: 1.6807146s, 480ns/32op 15/op
go1.8.3 amd64, Grp: 8, Times: 4000000, miss: 0, use: 1.8279384s, 456ns/32op 14/op
go1.8.3 amd64, Grp: 9, Times: 3600000, miss: 0, use: 1.6087893s, 446ns/32op 13/op
go1.8.3 amd64, Grp: 10, Times: 4000000, miss: 0, use: 1.8343257s, 458ns/32op 14/op
go1.8.3 amd64, Grp: 11, Times: 4400000, miss: 0, use: 1.9333989s, 439ns/32op 13/op
go1.8.3 amd64, Grp: 12, Times: 3600000, miss: 0, use: 1.5931753s, 442ns/32op 13/op
go1.8.3 amd64, Grp: 13, Times: 3900000, miss: 0, use: 1.7232328s, 441ns/32op 13/op
go1.8.3 amd64, Grp: 14, Times: 4200000, miss: 0, use: 1.8263283s, 434ns/32op 13/op
go1.8.3 amd64, Grp: 15, Times: 3000000, miss: 0, use: 1.2999684s, 433ns/32op 13/op
go1.8.3 amd64, Grp: 16, Times: 3200000, miss: 0, use: 1.3860389s, 433ns/32op 13/op
go1.8.3 amd64, Grp: Sum, Times: 56900000, miss: 0, use: 26.5638885s, 466ns/32op 14/op
----批量尺寸为32使用Put()和Gets()的测试结果如下----
使用Put, Gets 进行测试,块尺寸32, 性能提升不多, 对应 pprof5
go1.10.3 amd64, Grp: 1, Times: 48000000, miss: 0, use: 2.0869798s, 43ns/op
go1.10.3 amd64, Grp: 2, Times: 96000000, miss: 0, use: 4.253081s, 44ns/op
go1.10.3 amd64, Grp: 3, Times: 76800000, miss: 0, use: 3.448918s, 44ns/op
go1.10.3 amd64, Grp: 4, Times: 102400000, miss: 0, use: 4.5190001s, 44ns/op
go1.10.3 amd64, Grp: 5, Times: 128000000, miss: 0, use: 5.7560188s, 44ns/op
go1.10.3 amd64, Grp: 6, Times: 96000000, miss: 0, use: 4.0059824s, 41ns/op
go1.10.3 amd64, Grp: 7, Times: 112000000, miss: 0, use: 5.1509986s, 45ns/op
go1.10.3 amd64, Grp: 8, Times: 128000000, miss: 0, use: 5.5909991s, 43ns/op
go1.10.3 amd64, Grp: 9, Times: 115200000, miss: 0, use: 5.2770015s, 45ns/op
go1.10.3 amd64, Grp: 10, Times: 128000000, miss: 0, use: 5.5629135s, 43ns/op
go1.10.3 amd64, Grp: 11, Times: 140800000, miss: 0, use: 6.4389843s, 45ns/op
go1.10.3 amd64, Grp: 12, Times: 115200000, miss: 0, use: 5.1078895s, 44ns/op
go1.10.3 amd64, Grp: 13, Times: 124800000, miss: 0, use: 5.7341286s, 45ns/op
go1.10.3 amd64, Grp: 14, Times: 134400000, miss: 0, use: 5.8718558s, 43ns/op
go1.10.3 amd64, Grp: 15, Times: 96000000, miss: 0, use: 4.4031446s, 45ns/op
go1.10.3 amd64, Grp: 16, Times: 102400000, miss: 0, use: 4.4728723s, 43ns/op
go1.10.3 amd64, Grp: Sum, Times: 1744000000, miss: 0, use: 1m17.6807679s, 44ns/op
源代码如下:
// esQueue
package queue
import (
"fmt"
"runtime"
"sync/atomic"
)
type esCache struct {
putNo uint32
getNo uint32
value interface{}
}
// lock free queue
type EsQueue struct {
capaciity uint32
capMod uint32
putPos uint32
getPos uint32
cache []esCache
}
func NewQueue(capaciity uint32) *EsQueue {
q := new(EsQueue)
q.capaciity = minQuantity(capaciity)
q.capMod = q.capaciity - 1
q.putPos = 0
q.getPos = 0
q.cache = make([]esCache, q.capaciity)
for i := range q.cache {
cache := &q.cache[i]
cache.getNo = uint32(i)
cache.putNo = uint32(i)
}
cache := &q.cache[0]
cache.getNo = q.capaciity
cache.putNo = q.capaciity
return q
}
func (q *EsQueue) String() string {
getPos := atomic.LoadUint32(&q.getPos)
putPos := atomic.LoadUint32(&q.putPos)
return fmt.Sprintf("Queue{capaciity: %v, capMod: %v, putPos: %v, getPos: %v}",
q.capaciity, q.capMod, putPos, getPos)
}
func (q *EsQueue) Capaciity() uint32 {
return q.capaciity
}
func (q *EsQueue) Quantity() uint32 {
var putPos, getPos uint32
var quantity uint32
getPos = atomic.LoadUint32(&q.getPos)
putPos = atomic.LoadUint32(&q.putPos)
if putPos >= getPos {
quantity = putPos - getPos
} else {
quantity = q.capMod + (putPos - getPos)
}
return quantity
}
// put queue functions
func (q *EsQueue) Put(val interface{}) (ok bool, quantity uint32) {
var putPos, putPosNew, getPos, posCnt uint32
var cache *esCache
capMod := q.capMod
getPos = atomic.LoadUint32(&q.getPos)
putPos = atomic.LoadUint32(&q.putPos)
if putPos >= getPos {
posCnt = putPos - getPos
} else {
posCnt = capMod + (putPos - getPos)
}
if posCnt >= capMod-1 {
runtime.Gosched()
return false, posCnt
}
putPosNew = putPos + 1
if !atomic.CompareAndSwapUint32(&q.putPos, putPos, putPosNew) {
runtime.Gosched()
return false, posCnt
}
cache = &q.cache[putPosNew&capMod]
for {
getNo := atomic.LoadUint32(&cache.getNo)
putNo := atomic.LoadUint32(&cache.putNo)
if putPosNew == putNo && getNo == putNo {
cache.value = val
atomic.AddUint32(&cache.putNo, q.capaciity)
return true, posCnt + 1
} else {
runtime.Gosched()
}
}
}
// puts queue functions
func (q *EsQueue) Puts(values []interface{}) (puts, quantity uint32) {
var putPos, putPosNew, getPos, posCnt, putCnt uint32
capMod := q.capMod
getPos = atomic.LoadUint32(&q.getPos)
putPos = atomic.LoadUint32(&q.putPos)
if putPos >= getPos {
posCnt = putPos - getPos
} else {
posCnt = capMod + (putPos - getPos)
}
if posCnt >= capMod-1 {
runtime.Gosched()
return 0, posCnt
}
if capPuts, size := q.capaciity-posCnt, uint32(len(values)); capPuts >= size {
putCnt = size
} else {
putCnt = capPuts
}
putPosNew = putPos + putCnt
if !atomic.CompareAndSwapUint32(&q.putPos, putPos, putPosNew) {
runtime.Gosched()
return 0, posCnt
}
for posNew, v := putPos+1, uint32(0); v < putCnt; posNew, v = posNew+1, v+1 {
var cache *esCache = &q.cache[posNew&capMod]
for {
getNo := atomic.LoadUint32(&cache.getNo)
putNo := atomic.LoadUint32(&cache.putNo)
if posNew == putNo && getNo == putNo {
cache.value = values[v]
atomic.AddUint32(&cache.putNo, q.capaciity)
break
} else {
runtime.Gosched()
}
}
}
return putCnt, posCnt + putCnt
}
// get queue functions
func (q *EsQueue) Get() (val interface{}, ok bool, quantity uint32) {
var putPos, getPos, getPosNew, posCnt uint32
var cache *esCache
capMod := q.capMod
putPos = atomic.LoadUint32(&q.putPos)
getPos = atomic.LoadUint32(&q.getPos)
if putPos >= getPos {
posCnt = putPos - getPos
} else {
posCnt = capMod + (putPos - getPos)
}
if posCnt < 1 {
runtime.Gosched()
return nil, false, posCnt
}
getPosNew = getPos + 1
if !atomic.CompareAndSwapUint32(&q.getPos, getPos, getPosNew) {
runtime.Gosched()
return nil, false, posCnt
}
cache = &q.cache[getPosNew&capMod]
for {
getNo := atomic.LoadUint32(&cache.getNo)
putNo := atomic.LoadUint32(&cache.putNo)
if getPosNew == getNo && getNo == putNo-q.capaciity {
val = cache.value
cache.value = nil
atomic.AddUint32(&cache.getNo, q.capaciity)
return val, true, posCnt - 1
} else {
runtime.Gosched()
}
}
}
// gets queue functions
func (q *EsQueue) Gets(values []interface{}) (gets, quantity uint32) {
var putPos, getPos, getPosNew, posCnt, getCnt uint32
capMod := q.capMod
putPos = atomic.LoadUint32(&q.putPos)
getPos = atomic.LoadUint32(&q.getPos)
if putPos >= getPos {
posCnt = putPos - getPos
} else {
posCnt = capMod + (putPos - getPos)
}
if posCnt < 1 {
runtime.Gosched()
return 0, posCnt
}
if size := uint32(len(values)); posCnt >= size {
getCnt = size
} else {
getCnt = posCnt
}
getPosNew = getPos + getCnt
if !atomic.CompareAndSwapUint32(&q.getPos, getPos, getPosNew) {
runtime.Gosched()
return 0, posCnt
}
for posNew, v := getPos+1, uint32(0); v < getCnt; posNew, v = posNew+1, v+1 {
var cache *esCache = &q.cache[posNew&capMod]
for {
getNo := atomic.LoadUint32(&cache.getNo)
putNo := atomic.LoadUint32(&cache.putNo)
if posNew == getNo && getNo == putNo-q.capaciity {
values[v] = cache.value
cache.value = nil
getNo = atomic.AddUint32(&cache.getNo, q.capaciity)
break
} else {
runtime.Gosched()
}
}
}
return getCnt, posCnt - getCnt
}
// round 到最近的2的倍数
func minQuantity(v uint32) uint32 {
v--
v |= v >> 1
v |= v >> 2
v |= v >> 4
v |= v >> 8
v |= v >> 16
v++
return v
}