golang原生定时器
原生的定时器存在一些问题
- 我们其实能够发现Go语言标准库中的定时器在计时时间较短并且并发较高时有着非常明显的问题,所以在一些性能非常敏感的基础服务中使用定时器一定要非常注意,它可能达不到我们预期的效果。
- 类似tcp服务端心跳的场景,每一个链接都开一个定时器,如果链接较多影响性能。
什么是时间轮
延迟队列DelayQueue
如果看完上面,会发现博主在最后提到了延迟队列DelayQueue,DelayQueue使用最小堆实现,把队列里的元素按照过期时间排序。然后开一个协程死循环与队列第一个元素比较。
golang实现DelayQueue
实现延迟队列DelayQueue要先实现一个优先队列PriorityQueue,PriorityQueue的作用是使用最小堆来找到过期时间最近的元素。
PriorityQueue实现了heap接口
//队列中的元素
type item struct {
Value interface{}
Priority int64 //优先权
Index int
}
//这是由最小堆实现的优先队列
type priorityQueue []*item
func newPriorityQueue(capacity int) priorityQueue {
return make(priorityQueue, 0, capacity)
}
func (pq priorityQueue) Len() int {
return len(pq)
}
func (pq priorityQueue) Less(i, j int) bool {
return pq[i].Priority < pq[j].Priority
}
func (pq priorityQueue) Swap(i, j int) {
pq[i], pq[j] = pq[j], pq[i]
pq[i].Index = i
pq[j].Index = j
}
func (pq *priorityQueue) Push(x interface{}) {
n := len(*pq)
c := cap(*pq)
//动态扩容
if n+1 > c {
npq := make(priorityQueue, n, c*2)
copy(npq, *pq)
*pq = npq
}
*pq = (*pq)[0 : n+1]
item := x.(*item)
item.Index = n
(*pq)[n] = item
}
func (pq *priorityQueue) Pop() interface{} {
n := len(*pq)
c := cap(*pq)
//动态缩容
if n < (c/2) && c > 25 {
npq := make(priorityQueue, n, c/2)
copy(npq, *pq)
*pq = npq
}
item := (*pq)[n-1]
item.Index = -1
*pq = (*pq)[0 : n-1]
return item
}
// 与最小堆的堆顶比较
// 如果当前时间小于最小堆的堆顶,说明堆里所有的元素均没有到过期时间
// 如果当前时间大于最小堆的堆顶,移出堆顶,并重新排序
func (pq *priorityQueue) PeekAndShift(max int64) (*item, int64) {
if pq.Len() == 0 {
return nil, 0
}
item := (*pq)[0]
//最小堆的顶大于max
if item.Priority > max {
return nil, item.Priority - max
}
heap.Remove(pq, 0)
return item, 0
}
有了PriorityQueue,就开始实现DelayQueue了。
//延迟队列
type DelayQueue struct {
C chan interface{} // 有元素过期时的通知
mu sync.Mutex // 互斥锁
pq priorityQueue // 优先队列
sleeping int32 // 已休眠
wakeupC chan struct{} // 唤醒队列的通知
}
func New(size int) *DelayQueue {
return &DelayQueue{
C: make(chan interface{}), // 无缓冲管道
pq: newPriorityQueue(size), // 优先队列
wakeupC: make(chan struct{}), // 无缓冲管道saw
}
}
//添加元素到队列
func (dq *DelayQueue) Offer(elem interface{}, expiration int64) {
item := &item{Value: elem, Priority: expiration} //过期时间作为优先级,过期时间越小的优先级越高
dq.mu.Lock()
heap.Push(&dq.pq, item) // 将元素放到队尾,并递归与父节点做比较
index := item.Index
dq.mu.Unlock()
if index == 0 {
// 如果延迟队列为休眠状态,唤醒他
if atomic.CompareAndSwapInt32(&dq.sleeping, 1, 0) {
// 唤醒可能会发生阻塞
dq.wakeupC <- struct{}{}
}
}
}
// Poll启动一个无限循环,在这个循环中它不断地等待一个元素过期,然后将过期的元素发送到通道C。
func (dq *DelayQueue) Poll(exitC chan struct{}, nowF func() int64) {
for {
now := nowF()
dq.mu.Lock()
item, delta := dq.pq.PeekAndShift(now) //与最小堆的堆顶比较
if item == nil {
//没有要过期的定时器, 将延迟队列设置为休眠
//为什么要用atomic原子函数,是为了防止Offer 和 Poll出现竞争
atomic.StoreInt32(&dq.sleeping, 1)
}
dq.mu.Unlock()
if item == nil {
if delta == 0 {
// 说明延迟队列中已经没有timer,因此等待新的timer添加时wake up通知,或者等待退出通知
select {
case <-dq.wakeupC:
continue
case <-exitC:
goto exit
}
} else if delta > 0 {
// 说明延迟队列中存在未过期的定时器
select {
case <-dq.wakeupC:
// 当前定时器已经是休眠状态,如果添加了一个比延迟队列中最早过期的定时器更早的定时器,延迟队列被唤醒
continue
case <-time.After(time.Duration(delta) * time.Millisecond):
// timer.After添加了一个相对时间定时器,并等待到期
if atomic.SwapInt32(&dq.sleeping, 0) == 0 {
//防止被阻塞
<-dq.wakeupC
}
continue
case <-exitC:
goto exit
}
}
}
select {
case dq.C <- item.Value:
case <-exitC:
goto exit
}
}
exit:
// Reset the states
atomic.StoreInt32(&dq.sleeping, 0)
}
好了,那我们现在就有延迟队列了,最先到期的定时器能最先被找到。
golang实现timer
//Timer表示单个事件。当Timer超时时,给定的任务将被执行。
type Timer struct {
expiration int64 // 以毫秒为单位
task func() //任务
b unsafe.Pointer // 所属bucket的指针
element *list.Element // bucket中timers双向链表中的元素
}
//获取定时器所属的bucket
func (t *Timer) getBucket() *bucket {
return (*bucket)(atomic.LoadPointer(&t.b))
}
//设置定时器所属的bucket
func (t *Timer) setBucket(b *bucket) {
atomic.StorePointer(&t.b, unsafe.Pointer(b))
}
// 阻止定时器启动
func (t *Timer) Stop() bool {
stopped := false
for b := t.getBucket(); b != nil; b = t.getBucket() {
//从bucket(时间格)中移除定时器
stopped = b.Remove(t)
}
return stopped
}
golang 实现bucket(时间格)
//时间格
type bucket struct {
expiration int64 // 过期时间
mu sync.Mutex //互斥锁
timers *list.List //定时器链表(双向链表)
}
//new一个时间格
func newBucket() *bucket {
return &bucket{
timers: list.New(),
expiration: -1, //过期时间默认为-1
}
}
//获取过期时间
func (b *bucket) Expiration() int64 {
return atomic.LoadInt64(&b.expiration)
}
//设置过期时间
func (b *bucket) SetExpiration(expiration int64) bool {
return atomic.SwapInt64(&b.expiration, expiration) != expiration
}
//添加定时器
func (b *bucket) Add(t *Timer) {
b.mu.Lock()
e := b.timers.PushBack(t)
t.setBucket(b)
t.element = e
b.mu.Unlock()
}
//删除定时器
func (b *bucket) remove(t *Timer) bool {
if t.getBucket() != b {
//如果定时器所属的bucket不是当前的bucket返回false
return false
}
b.timers.Remove(t.element)
t.setBucket(nil)
t.element = nil
return true
}
func (b *bucket) Remove(t *Timer) bool {
b.mu.Lock()
defer b.mu.Unlock()
return b.remove(t)
}
// 刷新
// 1将定时器链表中的定时器全部清空
// 2将定时器链表中的定时器放入到ts切片中(ts = time slice)
// 3将bucket过期时间设置成-1
// 4循环遍历ts切片调用addOrRun方法
func (b *bucket) Flush(reinsert func(*Timer)) {
var ts []*Timer
b.mu.Lock()
//将定时器链表中的定时器全部删除,并放到ts切片中
for e := b.timers.Front(); e != nil; {
next := e.Next()
t := e.Value.(*Timer)
b.remove(t)
ts = append(ts, t)
e = next
}
b.mu.Unlock()
//将bucket的到期时间重新设置成-1
b.SetExpiration(-1) // TODO: Improve the coordination with b.Add()
for _, t := range ts {
reinsert(t)
}
}
golang实现timewheel
//分层时间轮
type TimingWheel struct {
tick int64 // 每一个时间格的跨度,以毫秒为单位
wheelSize int64 // 时间格的数量
interval int64 // 总的跨度数 tick * wheelSize,以毫秒为单位
currentTime int64 // 当前指针指向的时间,以毫秒为单位
buckets []*bucket //时间格列表
queue *DelayQueue //延迟队列,
overflowWheel unsafe.Pointer // 上一层时间轮的指针
exitC chan struct{} // 退出通知
waitGroup waitGroupWrapper
}
//对外暴露的初始化时间轮方法,参数为时间格跨度,和时间格数量
func NewTimingWheel(tick time.Duration, wheelSize int64) *TimingWheel {
//时间格(毫秒)
tickMs := int64(tick / time.Millisecond)
if tickMs <= 0 {
panic(errors.New("tick must be greater than or equal to 1ms"))
}
//开始时间
startMs := timeToMs(time.Now().UTC())
return newTimingWheel(
tickMs,
wheelSize,
startMs,
New(int(wheelSize)),//delayqueue
)
}
//内部初始化时间轮的方法,参数为,时间格跨度(毫秒),时间格数量,开始时间(毫秒), 延迟队列
func newTimingWheel(tickMs int64, wheelSize int64, startMs int64, queue *DelayQueue) *TimingWheel {
//根据时间格数量创建时间格列表
buckets := make([]*bucket, wheelSize)
for i := range buckets {
buckets[i] = newBucket()
}
return &TimingWheel{
tick: tickMs,
wheelSize: wheelSize,
currentTime: truncate(startMs, tickMs),
interval: tickMs * wheelSize,
buckets: buckets,
queue: queue,
exitC: make(chan struct{}),
}
}
//add添加定时器到时间轮
//如果定时器已过期返回false
func (tw *TimingWheel) add(t *Timer) bool {
//当前秒钟时间轮的currentTime = 1626333377000(2021-07-15 15:16:17)
currentTime := atomic.LoadInt64(&tw.currentTime)
if t.expiration < currentTime+tw.tick {
//定时器的过期时间已经过期返回false
return false
} else if t.expiration < currentTime+tw.interval {
//定时器的过期时间小于当前时间轮的当前时间+轮的总跨度,将定时器放到对应的bucket中,并将bucket放入延迟队列。
//假设过期时间为2021-07-15 15:17:02(1626333422000)
//1626333422000 < 1626333377000 + 60*1000
//virtualID = 1626333422000 / 1000 = 1626333422
//1626333422%60 = 2,将定时器放到第2个时间格中
//设置bucket(时间格)的过期时间
virtualID := t.expiration / tw.tick
b := tw.buckets[virtualID%tw.wheelSize]
b.Add(t)
if b.SetExpiration(virtualID * tw.tick) {
//如果设置的过期时间不等于桶的过期时间
//将bucket添加到延迟队列,重新排序延迟队列
tw.queue.Offer(b, b.Expiration())
}
return true
} else {
//定时器的过期时间 大于 当前时间轮的当前时间+轮的总跨度,递归将定时器添加到上一层轮。
overflowWheel := atomic.LoadPointer(&tw.overflowWheel)
if overflowWheel == nil {
atomic.CompareAndSwapPointer(
&tw.overflowWheel,
nil,
unsafe.Pointer(newTimingWheel(
tw.interval,
tw.wheelSize,
currentTime,
tw.queue,
)),
)
overflowWheel = atomic.LoadPointer(&tw.overflowWheel)
}
return (*TimingWheel)(overflowWheel).add(t)
}
}
// 执行已过期定时器的任务,将未到期的定时器重新放回时间轮
func (tw *TimingWheel) addOrRun(t *Timer) {
if !tw.add(t) {
go t.task()
}
}
//推进时钟
//我们就以时钟举例:假如当前时间是2021-07-15 15:16:17(1626333375000毫秒),过期时间是2021-07-15 15:17:18(1626333438000)毫秒
//从秒轮开始,1626333438000 > 1626333377000 + 1000, truncate(1626333438000,1000)=1626333438000, 秒轮的当前时间设置为1626333438000(2021-07-15 15:17:18),有上层时间轮
//到了分钟轮 1626333438000 > 1626333377000 + 60000=1626333437000, truncate(1626333438000,60000)=1626333420000(2021-07-15 15:17:00),分轮的当前时间设置为1626333438000,有上层时间轮
//到了时钟轮 1626333438000 < 1626333377000 + 360000,时钟轮当前时间不变(2021-07-15 15:16:17),没上层时间轮
func (tw *TimingWheel) advanceClock(expiration int64) {
currentTime := atomic.LoadInt64(&tw.currentTime)
if expiration >= currentTime+tw.tick {
//将过期时间截取到时间格间隔的最小整数倍
//举例:
//expiration = 100ms,tw.tick = 3ms, 结果 100 - 100%3 = 99ms,因此当前的时间来到了99ms,
//目的就是找到合适的范围,比如[0,3)、[3-6)、[6,9) expiration=5ms时,currentTime=3ms。
currentTime = truncate(expiration, tw.tick)
atomic.StoreInt64(&tw.currentTime, currentTime)
// 如果有上层时间轮,那么递归调用上层时间轮的引用
overflowWheel := atomic.LoadPointer(&tw.overflowWheel)
if overflowWheel != nil {
(*TimingWheel)(overflowWheel).advanceClock(currentTime)
}
}
}
//时间轮转起来
func (tw *TimingWheel) Start() {
tw.waitGroup.Wrap(func() {
//开启一个协程,死循环延迟队列,将已过期的bucket(时间格)弹出
tw.queue.Poll(tw.exitC, func() int64 {
return timeToMs(time.Now().UTC())
})
})
tw.waitGroup.Wrap(func() {
for {
select {
//开启另外一个协程,阻塞接收延迟队列弹出的bucket(时间格)
case elem := <-tw.queue.C:
// 从延迟队列弹出来的是一个bucket(时间格)
b := elem.(*bucket)
// 时钟推进,将时钟的当前时间推进到过期时间
tw.advanceClock(b.Expiration())
// 将bucket(时间格)中的已到期的定时器执行,还没有到过期时间重新放回时间轮
b.Flush(tw.addOrRun)
case <-tw.exitC:
return
}
}
})
}
//停止时间轮
//关闭管道
func (tw *TimingWheel) Stop() {
close(tw.exitC)
tw.waitGroup.Wait()
}
//添加定时任务到时间轮
func (tw *TimingWheel) AfterFunc(d time.Duration, f func()) *Timer {
t := &Timer{
expiration: timeToMs(time.Now().UTC().Add(d)),
task: f,
}
tw.addOrRun(t)
return t
}
整体实现思路
参考
http://russellluo.com/2018/10/golang-implementation-of-hierarchical-timing-wheels.html