type Queue struct {
content []string
Timeout int // timeout为0为无限延时, 小于0为不延时, 大于0为延时timeout秒
MaxSize int // 队列容量, 小于或等于0为不限量,不限量时延时无效,大于0且到达上限时则开始延时
}
var lock = sync.Mutex{}
// 超过设定延时时间后, 元素会被抛弃
func (q *Queue) put(msg string) {
lock.Lock()
closeSingle := make(chan bool)
succesSingle := make(chan bool)
go func(close chan bool, success chan bool) {
var t1 *time.Timer
t1 = time.NewTimer(time.Second*time.Duration(q.Timeout))
for {
select {
case <-t1.C:
if q.Timeout == 0{
t1 = time.NewTimer(time.Second*time.Duration(q.Timeout))
continue
} else {
success<-true
return
}
default:
if q.MaxSize != 0 && q.MaxSize == len(q.content) {
continue
} else {
q.content = append(q.content, msg)
success<-true
return
}
}
}
}(closeSingle, succesSingle)
for {
select {
case <-succesSingle:
lock.Unlock()
return
}
}
}
// 超过延时时间时会返回空字符串
func (q *Queue) get() string {
closeSingle := make(chan bool)
succesSingle := make(chan string)
go func(close chan bool, output chan string) {
var t1 *time.Timer
t1 = time.NewTimer(time.Second*time.Duration(q.Timeout))
for {
select {
case <-t1.C:
if q.Timeout == 0{
t1 = time.NewTimer(time.Second*time.Duration(q.Timeout))
continue
} else {
output<-""
return
}
default:
if 0 == len(q.content) {
continue
} else {
msg := q.content[0]
q.content = q.content[1:]
output<-msg
return
}
}
}
}(closeSingle, succesSingle)
for {
select {
case res := <-succesSingle:
return res
}
}
}
func main() {
q := Queue{
content: []string{},
Timeout: 3,
MaxSize: 4,
}
go func() {
for i:=0;i<50;i++ {
st := "hello" + strconv.Itoa(i)
q.put(st)
fmt.Println(st)
}
}()
for i:=0;i<50;i++ {
fmt.Println("read: " + q.get())
}
}