背景

延迟队列是一种特殊的队列,元素入队时需要指定到期时间(或延迟时间),从队头出队的元素必须是已经到期的,而且最先到期的元素最先出队,也就是队列里面的元素是按照到期时间排序的,添加元素和从队头出队的时间复杂度是O(log(n))。

由于以上性质,延迟队列一般可以用于以下场景(定时任务、延迟任务):

  • 缓存:用户淘汰过期元素
  • 通知:在指定时间通知用户,比如会议开始前30分钟
  • 订单:30分钟未支付取消订单
  • 超时:服务器自动断开太长时间没有心跳的连接
time.After()time.AfterFunc()

原理

延迟队列每次出队的是最小到期时间的元素,而堆就是用来获取最值的数据结构。使用堆我们可以实现O(log(n))时间复杂度添加元素和移除最小到期时间元素。

随机删除

有时候延迟队列还需要具有随机删除元素的能力,可以通过以下方式实现:

  • 元素添加删除标记字段:堆中每个元素都添加一个删除标记字段,并把这个元素的地址返回给用户,用户就可以标记元素的这个字段为true,这样元素到达堆顶时如果判断到这个字段为true就会被清除,而延迟队列里的元素逻辑上是一定会到达堆顶的(因为时间会流逝)。这是一种懒删除的方式。
  • 元素添加堆中下标字段(或用map记录下标):堆中每个元素都添加一个堆中下标字段,并把这个元素的地址返回给用户,这样我们就可以通过这个元素里面记录的下标快速定位元素在堆中的位置,从而删除元素。详细可以看文章如何实现一个支持O(log(n))随机删除元素的堆。

重置元素到期时间

元素添加堆中下标字段

Golang实现

这里我们实现一个最简单的延迟队列,也就是不支持随机删除元素和重置元素的到期时间,因为有些场景只需要添加元素和获取到期元素这两个功能,比如Kafka中的时间轮,而且这种简单实现性能会高一点。

数据结构

主要的结构可以看到就是一个heap,Entry是每个元素在堆中的表示,Value是具体的元素值,Expired是为了堆中元素根据到期时间排序。

mutex是一个互斥锁,主要是保证操作并发安全。

wakeup是一个缓冲区长度为1的通道,通过它实现添加元素的时候唤醒等待队列不为空或者有更小到期时间元素加入的协程。(重点)

type Entry[T any] struct {
	Value   T
	Expired time.Time // 到期时间
}

// 延迟队列
type DelayQueue[T any] struct {
	h      *heap.Heap[*Entry[T]]
	mutex  sync.Mutex    // 保证并发安全
	wakeup chan struct{} // 唤醒通道
}

// 创建延迟队列
func New[T any]() *DelayQueue[T] {
	return &DelayQueue[T]{
		h: heap.New(nil, func(e1, e2 *Entry[T]) bool {
			return e1.Expired.Before(e2.Expired)
		}),
		wakeup: make(chan struct{}, 1),
	}
}

实现原理

阻塞获取元素的时候如果队列已经没有元素,或者没有元素到期,那么协程就需要挂起等待。而被唤醒的条件是元素到期、队列不为空或者有更小到期时间元素加入。

其中元素到期协程在阻塞获取元素时发现堆顶元素还没到期,因此这个条件可以自己构造并等待。但是条件队列不为空和有更小到期时间元素加入则需要另外一个协程在添加元素时才能满足,因此必须通过一个中间结构来进行协程间通信,一般Golang里面会使用Channel来实现。

添加元素

一开始加了一个互斥锁,避免并发冲突,然后把元素加到堆里。

因为我们Take()操作,既阻塞获取元素操作,在不满足条件时会去等待wakeup通道,但是等待通道前必须释放锁,否则Push()无法写入新元素去满足条件队列不为空和有更小到期时间元素加入。而从释放锁后到开始读取wakeup通道这段时间是没有锁保护的,如果Push()在这期间插入新元素,为了保证通道不阻塞同时又能通知到Take()协程,我们的通道的长度需要是1,同时使用select+default保证在通道里面已经有元素的时候不阻塞Push()协程。

// 添加延迟元素到队列
func (q *DelayQueue[T]) Push(value T, delay time.Duration) {
	q.mutex.Lock()
	defer q.mutex.Unlock()
	entry := &Entry[T]{
		Value:   value,
		Expired: time.Now().Add(delay),
	}
	q.h.Push(entry)
	// 唤醒等待的协程
	// 这里表示新添加的元素到期时间是最早的,或者原来队列为空
	// 因此必须唤醒等待的协程,因为可以拿到更早到期的元素
	if q.h.Peek() == entry {
		select {
		case q.wakeup <- struct{}{}:
		default:
		}
	}
}

阻塞获取元素

这里先判断堆是否有元素,如果有获取堆顶元素,然后判断是否已经到期,如果到期则直接出堆并返回。否则等待直到超时或者元素到期或者有新的元素到达。

这里在解锁之前会清空wakeup通道,这样可以保证下面读取的wakeup通道里的元素肯定是新加入的。

// 等待直到有元素到期
// 或者ctx被关闭
func (q *DelayQueue[T]) Take(ctx context.Context) (T, bool) {
	for {
		var expired *time.Timer
		q.mutex.Lock()
		// 有元素
		if !q.h.Empty() {
			// 获取元素
			entry := q.h.Peek()
			if time.Now().After(entry.Expired) {
				q.h.Pop()
				q.mutex.Unlock()
				return entry.Value, true
			}
			// 到期时间,使用time.NewTimer()才能够调用Stop(),从而释放定时器
			expired = time.NewTimer(time.Until(entry.Expired))
		}
		// 避免被之前的元素假唤醒
		select {
		case <-q.wakeup:
		default:
		}
		q.mutex.Unlock()

		// 不为空,需要同时等待元素到期
                // 并且除非expired到期,否则都需要关闭expired避免泄露
		if expired != nil {
			select {
			case <-q.wakeup: // 新的更快到期元素
				expired.Stop()
			case <-expired.C: // 首元素到期
			case <-ctx.Done(): // 被关闭
				expired.Stop()
				var t T
				return t, false
			}
		} else {
			select {
			case <-q.wakeup: // 新的更快到期元素
			case <-ctx.Done(): // 被关闭
				var t T
				return t, false
			}
		}
	}
}

Channel方式阻塞读取

Golang里面可以使用Channel进行流式消费,因此简单包装一个Channel形式的阻塞读取接口,给通道一点缓冲区大小可以带来更好的性能。

// 返回一个通道,输出到期元素
// size是通道缓存大小
func (q *DelayQueue[T]) Channel(ctx context.Context, size int) <-chan T {
	out := make(chan T, size)
	go func() {
		for {
			entry, ok := q.Take(ctx)
			if !ok {
				return
			}
			out <- entry
		}
	}()
	return out
}

使用方式

for entry := range q.Channel(context.Background(), 10) {
    // do something
}

性能测试

这里进行一个简单的性能测试,也就是先添加元素,然后等待到期后全部拿出来。

func BenchmarkPushAndTake(b *testing.B) {
	q := New[int]()
	b.ResetTimer()
        
        // 添加元素
	for i := 0; i < b.N; i++ {
		q.Push(i, time.Duration(i))
	}
        
        // 等待全部元素到期
	b.StopTimer()
	time.Sleep(time.Duration(b.N))
	b.StartTimer()

        // 获取元素
	for i := 0; i < b.N; i++ {
		_, ok := q.Take(context.Background())
		if !ok {
			b.Errorf("want %v, but %v", true, ok)
		}
	}
}

测试结果:

Benchmark-8      2331534               476.8 ns/op            76 B/op          1 allocs/op

总结

堆实现的延迟队列是一种实现起来比较简单的定时器(当然阻塞读取Take()是比较复杂的),由于时间复杂度是O(log(n)),因此可以满足定时任务数量不是特别多的场景。堆实现的延迟队列也是可以随机删除元素的,可以根据具体任务选择是否实现。如果对定时器性能要求比较敏感的话可以选择使用时间轮实现定时器,它可以在O(1)的时间复杂度添加和删除一个定时器,不过实现起来比较复杂(挖个坑,下篇文章实现)。