package utils
type QueueData struct {
Data interface{}
CreateAt int64
}
type Queue struct {
mux sync.RWMutex
data *arraylist.List
queueConsumerFun QueueConsumerFunc
elapsedDuration time.Duration
timesTheSpeed uint8
close bool
}
type QueueConsumerFunc func(data interface{}) error
func queueComparator(a, b interface{}) int {
c1, c2 := a.(QueueData), b.(QueueData)
if c1.CreateAt < c2.CreateAt {
return -1
} else if c1.CreateAt < c2.CreateAt {
return 0
}
return 1
}
func NewSimulatorQueue(elapsed time.Duration, timesTheSpeed uint8, f QueueConsumerFunc) *Queue {
if timesTheSpeed <= 0 {
timesTheSpeed = 1
}
q := &Queue{
data: arraylist.New(),
queueConsumerFun: f,
elapsedDuration: elapsed,
timesTheSpeed: timesTheSpeed,
}
go q.process()
return q
}
func (q *Queue) Push(data interface{}) {
q.mux.Lock()
defer q.mux.Unlock()
q.data.Add(QueueData{
Data: data,
CreateAt: time.Now().In(components.Loc).UnixNano(),
})
q.data.Sort(queueComparator)
}
func (q *Queue) Close() {
q.close = true
}
func (q *Queue) process() {
for {
if q.close {
break
}
iter := q.data.Iterator()
data := QueueData{}
for iter.Next() {
data = iter.Value().(QueueData)
if time.Now().In(components.Loc).Before(
time.Unix(data.CreateAt/1e9, data.CreateAt-(data.CreateAt/1e9)*1e9).In(
components.Loc).Add(q.elapsedDuration / time.Duration(q.timesTheSpeed))) {
break
}
go q.queueConsumerFun(data.Data)
q.mux.Lock()
q.data.Remove(iter.Index())
q.mux.Unlock()
}
time.Sleep(time.Microsecond * 200)
}
}
使用:
haha.InitBindBoxTaskQueue = utils.NewSimulatorQueue(time.Second*helpers.SimConf.InitBindBoxElapsed, helpers.SimConf.Multiplier, chute.ExecuteInitBindBox)
wawa.AgvArrivedTaskQueue = utils.NewSimulatorQueue(time.Second*helpers.SimConf.AgvArrivedElapsed, helpers.SimConf.Multiplier, agv.ExecuteAgvArrived)
haha,wawa是什么:包名而已
package haha
var InitBindBoxTaskQueue *utils.Queue
params := ExecuteInitBindBoxParams{
ChuteId: chuteList[0].ChuteID,
TagId: chuteList[0].DeviceID,
ContainerCode: chuteList[0].ContainerID,
}
InitBindBoxTaskQueue.Push(params)