package main
import (
queue "github.com/fwhezfwhez/go-queue"
"fmt"
)
func main() {
//初始化,init
q:= queue.NewEmpty()
//压入,push
q.Push(5)
q.Push(4)
//打印,print
q.Print()
//出列,pop
fmt.Println(q.Pop())
//打印,print
q.Print()
//长度,len
fmt.Println(q.Length())
//并发安全压入,currently safe push
q.SafePush(6)
//并发安全出列,currently safe pop
fmt.Print(q.SafePop())
q.Print()
// time queue
tq := queue.TimeQueueWithTimeStep(10*time.Second, 50, 1*time.Nanosecond)
tq.StartTimeSpying()
tq.TPush(5)
tq.SafeTPush(6)
fmt.Println("init:")
tq.Print()
time.Sleep(5 * time.Second)
fmt.Println("after 5s:")
tq.Print()
time.Sleep(9 * time.Second)
fmt.Println("after 14s")
tq.Print()
}
// start to spy on queue's time-out data and throw it
func (q *Queue) StartTimeSpying() {
fmt.Println("time supervisor starts")
go q.startTimeSpying()
}
// detail of StartTimeSpying function
func (q *Queue) startTimeSpying() error {
var err = make(chan string, 0)
go func(queue *Queue, er chan string) {
fmt.Println("start time spying, data in the queue can stay for " q.ExpireAfter.String())
for {
if queue.timeSpy == false {
err <- "spying routine stops because: queue's timeSpy is false, make sure the queue is definition by q=TimeQueue(time.Duration,int)"
return
}
select {
case <-queue.flag:
fmt.Println("time spy executing stops")
return
default:
fmt.Print()
}
ok,er:=queue.timingRemove()
if er!=nil{
err <- er.(errorx.Error).StackTrace()
}
if ok {
time.Sleep(queue.timeStep)
}
}
}(q, err)
select {
case msg := <-err:
fmt.Println("time spy supervisor accidentally stops because: ",msg)
return errorx.NewFromString(msg)
case <-q.flag:
fmt.Println("time spy supervisor stops")
return nil
}
}
// remove those time-out data
func (q *Queue) timingRemove() (bool,error) {
if len(q.Data) <1 {
return true,nil
}
head, index, er := q.THead()
if er != nil {
return false, errorx.Wrap(er)
}
if index < 0 {
return false, errorx.NewFromString("queue'length goes 0")
}
now := time.Now().Unix()
created := time.Unix(head.CreatedAt, 0)
//fmt.Println("now:",now)
//fmt.Println("expire:",created.Add(q.ExpireAfter).Unix())
if created.Add(q.ExpireAfter).Unix() < now {
// out of time
_,_,e := q.TPop()
if e!=nil {
return false, errorx.Wrap(e)
}
if len(q.Data) >0 {
return q.timingRemove()
}else{
return true,nil
}
} else{
return true ,nil
}
}
本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
转藏
分享
QQ空间
QQ好友
新浪微博
微信
献花(0)
+1
来自: >