前一久看到一篇文章美团
高性能队列——Disruptor,时候自己琢磨了一下;经过反复修改,实现了一个相似的无锁队列EsQueue,该无锁队列相对Disruptor,而言少了队列数量属性quantity的CAP操作,因此性能杠杠的,在测试环境:windows10,Core(TM) i5-3320M CPU 2.6G, 8G 内存,go1.7.4,性能达到1360万/秒。现在把代码发布出来,请同行验证一下,代码如下:
// esQueue
package esQueue
import (
"runtime"
"sync/atomic"
)
type esCache struct {
value interface{}
mark bool
}
// lock free queue
type EsQueue struct {
capaciity uint32
capMod uint32
putPos uint32
getPos uint32
cache []esCache
}
func NewQueue(capaciity uint32) *EsQueue {
q := new(EsQueue)
q.capaciity = minQuantity(capaciity)
q.capMod = q.capaciity - 1
q.cache = make([]esCache, q.capaciity)
return q
}
func (q *EsQueue) Capaciity() uint32 {
return q.capaciity
}
func (q *EsQueue) Quantity() uint32 {
var putPos, getPos uint32
var quantity uint32
getPos = q.getPos
putPos = q.putPos
if putPos >= getPos {
quantity = putPos - getPos
} else {
quantity = q.capMod + putPos - getPos
}
return quantity
}
// put queue functions
func (q *EsQueue) Put(val interface{}) (ok bool, quantity uint32) {
var putPos, putPosNew, getPos, posCnt uint32
var cache *esCache
capMod := q.capMod
for {
getPos = q.getPos
putPos = q.putPos
if putPos >= getPos {
posCnt = putPos - getPos
} else {
posCnt = capMod + putPos - getPos
}
if posCnt >= capMod {
runtime.Gosched()
return false, posCnt
}
putPosNew = putPos + 1
if atomic.CompareAndSwapUint32(&q.putPos, putPos, putPosNew) {
break
} else {
runtime.Gosched()
}
}
cache = &q.cache[putPosNew&capMod]
for {
if !cache.mark {
cache.value = val
cache.mark = true
return true, posCnt + 1
} else {
runtime.Gosched()
}
}
}
// get queue functions
func (q *EsQueue) Get() (val interface{}, ok bool, quantity uint32) {
var putPos, getPos, getPosNew, posCnt uint32
var cache *esCache
capMod := q.capMod
for {
putPos = q.putPos
getPos = q.getPos
if putPos >= getPos {
posCnt = putPos - getPos
} else {
posCnt = capMod + putPos - getPos
}
if posCnt < 1 {
runtime.Gosched()
return nil, false, posCnt
}
getPosNew = getPos + 1
if atomic.CompareAndSwapUint32(&q.getPos, getPos, getPosNew) {
break
} else {
runtime.Gosched()
}
}
cache = &q.cache[getPosNew&capMod]
for {
if cache.mark {
val = cache.value
cache.mark = false
return val, true, posCnt - 1
} else {
runtime.Gosched()
}
}
}
// round 到最近的2的倍数
func minQuantity(v uint32) uint32 {
v--
v |= v >> 1
v |= v >> 2
v |= v >> 4
v |= v >> 8
v |= v >> 16
v++
return v
}
使用golang的Test测试结果
Go: 1, Times: 1000000, use: 40.5283ms, 40ns/op
Go: 2, Times: 2000000, use: 109.0824ms, 54ns/op
Go: 3, Times: 3000000, use: 212.6447ms, 70ns/op
Go: 4, Times: 4000000, use: 290.6942ms, 72ns/op
Go: 5, Times: 5000000, use: 370.2461ms, 74ns/op
Go: 6, Times: 6000000, use: 436.29ms, 72ns/op
Go: 7, Times: 7000000, use: 518.3449ms, 74ns/op
Go: 8, Times: 8000000, use: 595.8935ms, 74ns/op
Go: 9, Times: 9000000, use: 681.9546ms, 75ns/op
Go: 10, Times: 1000000, use: 83.5561ms, 83ns/op
Go: 11, Times: 1100000, use: 90.5608ms, 82ns/op
Go: 12, Times: 1200000, use: 98.0624ms, 81ns/op
Go: 13, Times: 1300000, use: 105.5723ms, 81ns/op
Go: 14, Times: 1400000, use: 114.0762ms, 81ns/op
Go: 15, Times: 1500000, use: 122.5817ms, 81ns/op
Go: 16, Times: 1600000, use: 130.5921ms, 81ns/op
Go:Sum, Times: 54100000, use: 4.0006803s, 73ns/op