go在1.18迎来了一次大的改版,引进了相关泛型。这使得在开发过程有了更多的扩展性,可以有效解决功能相同但是代码重复的部分。
这次使用泛型+管道的方式实现了一个简单的阻塞队列。
对于管道而言就是go提供给我们天然的阻塞队列,我们在此基础上再根据泛型的特性可以完成对于任何类型的队列方式,此队列对于高并发的情况下依旧有效,类似于生产者消费者的应用。
直接上code:
import (
"context"
"time"
)
/**
@author: nizhenxian
@date: 2022/3/11 10:07:12
**/
// RejectHandler Reject push into queue if return false.
type RejectHandler func(ctx context.Context) bool
type Queue[T any] interface {
Push(value T)
TryPush(value T, timeout time.Duration) bool
Poll() T
TryPoll(timeout time.Duration) (T, bool)
}
type BlockingQueue[T any] struct {
q chan T // 阻塞队列使用通道表示
limit int // 阻塞队列的大小
ctx context.Context
handler RejectHandler
}
func NewBlockingQueue[T any](ctx context.Context, queueSize int) *BlockingQueue[T] {
return &BlockingQueue[T]{
q: make(chan T, queueSize),
limit: queueSize,
ctx: ctx,
}
}
// SetRejectHandler Set a reject handler.
func (q *BlockingQueue[T]) SetRejectHandler(handler RejectHandler) {
q.handler = handler
}
func (q *BlockingQueue[T]) Push(value T) {
ok := true
if q.handler != nil {
select {
case q.q <- value:
return
default:
ok = q.handler(q.ctx)
}
}
if ok {
q.q <- value
}
}
func (q *BlockingQueue[T]) TryPush(value T, timeout time.Duration) bool {
ctx, cancel := context.WithTimeout(q.ctx, timeout)
defer cancel()
select {
case q.q <- value:
return true
case <-ctx.Done():
}
return false
}
func (q *BlockingQueue[T]) Poll() T {
ret := <-q.q
return ret
}
func (q *BlockingQueue[T]) TryPoll(timeout time.Duration) (ret T, ok bool) {
ctx, cancel := context.WithTimeout(q.ctx, timeout)
defer cancel()
select {
case ret = <-q.q:
return ret, true
case <-ctx.Done():
}
return ret, false
}
func (q *BlockingQueue[T]) size() int {
return len(q.q)
}
使用:
func main() {
a := NewBlockingQueue[int](context.Background(), 3)
a.SetRejectHandler(func(ctx context.Context) bool {
fmt.Println("reject")
return false
})
for i := 0; i < 4; i++ {
go a.Push(i)
}
select {}
}