ce371be518d4427a970584641897b994~tplv-k3u1fbpfcp-watermark.image?

go在1.18迎来了一次大的改版,引进了相关泛型。这使得在开发过程有了更多的扩展性,可以有效解决功能相同但是代码重复的部分。

这次使用泛型+管道的方式实现了一个简单的阻塞队列。

对于管道而言就是go提供给我们天然的阻塞队列,我们在此基础上再根据泛型的特性可以完成对于任何类型的队列方式,此队列对于高并发的情况下依旧有效,类似于生产者消费者的应用。

直接上code:


import (
   "context"
   "time"
)

/**
 @author: nizhenxian
 @date: 2022/3/11 10:07:12
**/

// RejectHandler Reject push into queue if return false.
type RejectHandler func(ctx context.Context) bool

type Queue[T any] interface {
   Push(value T)
   TryPush(value T, timeout time.Duration) bool
   Poll() T
   TryPoll(timeout time.Duration) (T, bool)
}

type BlockingQueue[T any] struct {
   q       chan T // 阻塞队列使用通道表示
   limit   int    // 阻塞队列的大小
   ctx     context.Context
   handler RejectHandler
}

func NewBlockingQueue[T any](ctx context.Context, queueSize int) *BlockingQueue[T] {
   return &BlockingQueue[T]{
      q:     make(chan T, queueSize),
      limit: queueSize,
      ctx:   ctx,
   }
}

// SetRejectHandler Set a reject handler.
func (q *BlockingQueue[T]) SetRejectHandler(handler RejectHandler) {
   q.handler = handler
}

func (q *BlockingQueue[T]) Push(value T) {
   ok := true
   if q.handler != nil {
      select {
      case q.q <- value:
         return
      default:
         ok = q.handler(q.ctx)
      }
   }
   if ok {
      q.q <- value
   }
}

func (q *BlockingQueue[T]) TryPush(value T, timeout time.Duration) bool {
   ctx, cancel := context.WithTimeout(q.ctx, timeout)
   defer cancel()
   select {
   case q.q <- value:
      return true
   case <-ctx.Done():
   }
   return false
}

func (q *BlockingQueue[T]) Poll() T {
   ret := <-q.q
   return ret
}

func (q *BlockingQueue[T]) TryPoll(timeout time.Duration) (ret T, ok bool) {
   ctx, cancel := context.WithTimeout(q.ctx, timeout)
   defer cancel()
   select {
   case ret = <-q.q:
      return ret, true
   case <-ctx.Done():
   }
   return ret, false
}

func (q *BlockingQueue[T]) size() int {
   return len(q.q)
}

使用:

func main() {
   a := NewBlockingQueue[int](context.Background(), 3)
   a.SetRejectHandler(func(ctx context.Context) bool {
      fmt.Println("reject")
      return false
   })
   for i := 0; i < 4; i++ {
      go a.Push(i)
   }
   select {}
}