package  main
import (
	queue "github.com/fwhezfwhez/go-queue"
	"fmt"
)
func main() {
    //初始化,init
    q:= queue.NewEmpty()
    //压入,push
    q.Push(5)
    q.Push(4)
    //打印,print
    q.Print()
    //出列,pop
    fmt.Println(q.Pop())
    //打印,print
    q.Print()
    //长度,len
    fmt.Println(q.Length())
    //并发安全压入,currently safe push
    q.SafePush(6)
    //并发安全出列,currently safe pop
    fmt.Print(q.SafePop())
    q.Print()

    // time queue
    tq := queue.TimeQueueWithTimeStep(10*time.Second, 50, 1*time.Nanosecond)
    tq.StartTimeSpying()
    tq.TPush(5)
    tq.SafeTPush(6)

    fmt.Println("init:")
    tq.Print()

    time.Sleep(5 * time.Second)
    fmt.Println("after 5s:")
    tq.Print()

    time.Sleep(9 * time.Second)
    fmt.Println("after 14s")
    tq.Print()
}
// start to spy on queue's time-out data and throw it
func (q *Queue) StartTimeSpying() {
	fmt.Println("time supervisor starts")
	go q.startTimeSpying()
}

// detail of StartTimeSpying function
func (q *Queue) startTimeSpying() error {
	var err = make(chan string, 0)
	go func(queue *Queue, er chan string) {
		fmt.Println("start time spying, data in the queue can stay for "   q.ExpireAfter.String())
		for {
			if queue.timeSpy == false {
				err <- "spying routine stops because: queue's timeSpy is false, make sure the queue is definition by q=TimeQueue(time.Duration,int)"
				return
			}
			select {
			case <-queue.flag:
				fmt.Println("time spy executing stops")
				return
			default:
				fmt.Print()
			}
			ok,er:=queue.timingRemove()
			if er!=nil{
				err <- er.(errorx.Error).StackTrace()
			}
			if ok {
				time.Sleep(queue.timeStep)
			}
		}
	}(q, err)
	select {
	case msg := <-err:
		fmt.Println("time spy supervisor accidentally stops because: ",msg)
		return errorx.NewFromString(msg)
	case <-q.flag:
		fmt.Println("time spy supervisor stops")
		return nil
	}
}


// remove those time-out data
func (q *Queue) timingRemove() (bool,error) {
	if len(q.Data) <1 {
		return true,nil
	}
	head, index, er := q.THead()
	if er != nil {
		return false, errorx.Wrap(er)
	}
	if index < 0 {
		return false, errorx.NewFromString("queue'length goes 0")
	}
	now := time.Now().Unix()
	created := time.Unix(head.CreatedAt, 0)
	//fmt.Println("now:",now)
	//fmt.Println("expire:",created.Add(q.ExpireAfter).Unix())
	if created.Add(q.ExpireAfter).Unix() < now {
		// out of time
		_,_,e := q.TPop()
		if e!=nil {
			return false, errorx.Wrap(e)
		}
		if len(q.Data) >0 {
			return q.timingRemove()
		}else{
			return true,nil
		}
	} else{
		return true ,nil
	}
}

本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
转藏 分享
QQ空间 QQ好友 新浪微博 微信
献花(0) +1

来自: >