前言:

又是golang的定时器时间轮库,这是我实现的第二版时间轮了 😅 ,以前的版本不是不能用,不好用而已。主要问题是实现偏复杂,就懒得维护,索性直接重新造轮子了。

以前是如何实现的 ?以前时间轮每个格子里是一个最小堆,由于堆的时间复杂度问题,导致密集场景下cpu还是不低。当然对比golang timer要好的多。 当然也有优点,为了支持并发操作时间轮,把锁的粒度细到每个槽位,减少了锁的竞争。

第一版的代码,有兴趣可以看看,https://github.com/rfyiamcool/go-ringtimer

话说,当初为什么要在时间轮里加入最小堆,而不是双端列表或者map这类的结构。主要是考虑定时任务超过了时间轮的大小,而现在改用map来存,对于超过时间轮周期的任务设定一个round环数,每轮一圈就减 1 。

老生常谈

golang在1.9.x版本后优化了定时器,以前为一个四叉堆,现在是多个四叉堆,具体你能用到多少个堆,要看你的gomaxprocs配置大小了。go timer init里初始化了64个timerBucket,那么time会如何选择timerBucket? 是通过g.m.p.id % 64取摸拿到的timerBucket。

// xiaorui.cc

func (t *timer) assignBucket() *timersBucket {
    id := uint8(getg().m.p.ptr().id) % timersLen
    t.tb = &timers[id].timersBucket
    return t.tb
}

那么golang分了这么多时间堆是干嘛的? 主要是为了解决锁的竞争。既然做了优化,那么时间轮的意义又是什么?

golang标准库里实现的定时器都是高精度的,高精度带来了频发的读写操作heap和锁的竞争压力。

时间轮的意义在于把精度放低,比如同一秒内的定时任务放凑在一起,数据结构改用map, 那么时间轮只需要o(1)的时间复杂度就可以把任务放进去,只需要一次加锁放锁就可以把同一个时间精度的任务都捞出来。

go timewheel

timewheel的代码实现原理我这里就不细说了,源码已经推到github里了,有兴趣的朋友可以看下。https://github.com/rfyiamcool/go-timewheel

下面列下go-timewheel间轮的基本用法:

// xiaorui.cc
// https://github.com/rfyiamcool/go-timewheel/blob/master/README.md

// 初始化时间轮, 精度为1s,槽位360个
tw, err := NewTimeWheel(1 * time.Second, 360)
if err != nil {
    panic(err)
}

// 启动时间轮
tw.Start()

// 关闭时间轮
tw.Stop()

// 添加任务
task := tw.Add(5 * time.Second, func(){})

// 删除任务
tw.Remove(task)

// 添加周期性任务
task := tw.AddCron(5 * time.Second, func(){
    ...
})

// 实现time.Sleep
tw.Sleep(5 * time.Second)
similar to time.After()

// 实现After
<- tw.After(5 * time.Second)
similar to time.NewTimer

// 实现NewTimer定时器
timer :=tw.NewTimer(5 * time.Second)
<- timer.C
timer.Reset(1 * time.Second)
timer.Stop()

// 实现NewTicker定时器
timer :=tw.NewTicker(5 * time.Second)
<- timer.C
timer.Stop()


// 实现go time的AfterFunc
runner :=tw.AfterFunc(5 * time.Second, func(){})
<- runner.C
runner.Stop()

功能点实现思路

如何解决提交的定时任务小于时间轮的精度?

如果小于时间轮精度,那么就强制该任务为一个时间轮精度。

如何保证map结构的读写安全?

增删改查都在一个协程里,添加删除任务也只是添加到chan里。

如何保证调度器和任务添加在一个bucket的冲突问题?

比如调度器正在执行bucket 1,但是添加任务的槽位也是bucket 1,由于写安全,添加是在调度器执行之后,那么可以想象该任务算是被遗漏了,只有在下次轮询才能被执行。当然,还是有办法解决的。

我采用的方法跟上面一样,所有时间轮任务的管理都在一个协程里。

调度器每次执行任务是按照time.Ticker走的,如果调度器执行流发生超时?

一般是不会出现超时。但如果发生超时,会导致时间轮变慢。我们可以独立time.Ticker定时器,go默认给time.Ticker缓冲为1,我们可以多缓冲几个事件。

// xiaorui.cc


func (tw *TimeWheel) tickGenerator() {
	if tw.tickQueue != nil {
		return
	}

	for !tw.exited {
		select {
		case <-tw.ticker.C:
			select {
			case tw.tickQueue <- time.Now():
			default:
				panic("raise long time blocking")
			}
		}
	}
}

func (tw *TimeWheel) schduler() {
	queue := tw.ticker.C
	if tw.tickQueue == nil {
		queue = tw.tickQueue
	}

	for {
		select {
		case <-queue:
			tw.handleTick()
...

如何兼容了golang time定时器的实现?

通过在定时任务里加入各种功能的回调方法来实现的,这里在时间轮里实现了类似标准库的 time.After, time.Sleep, time.NewTimer, time.AfterFunc, time.Tikcer的方法。

如何解决go标准库里timer, ticker stop后,select无法接收到已经stop事件的问题

我在timer ticker里封装了context cancel,当用户触发stop的时候,可以使用ctx来得知该定时器已经被关闭。

// xiaorui.cc
// similar to golang std timer
type Timer struct {
	task *Task
	tw   *TimeWheel
	fn   func() // external custom func
	C    chan bool

	cancel context.CancelFunc
	Ctx    context.Context
}

func (t *Timer) Reset(delay time.Duration) {
	var task *Task
	if t.fn != nil { // use AfterFunc
		task = t.tw.addAny(delay,
			func() {
				t.fn()
				notfiyChannel(t.C)
			},
			modeNotCircle, modeIsAsync, // must async mode
		)
	} else {
		task = t.tw.addAny(delay,
			func() {
				notfiyChannel(t.C)
			},
			modeNotCircle, modeNotAsync)
	}

	t.task = task
}

func (t *Timer) Stop() {
	t.task.stop = true
	t.cancel()
	t.tw.Remove(t.task)
}

在时间轮里,调度器处理事件时,区分了同步和异步调用

参考了go time的实现,当定时任务类型是通知类型的,比如timer.C,那么就可以用同步调用,因为在select里实现了default且chan buf为1,所谓不会阻塞。 当进行AfterFunc时,就使用go func调用。

简单说,直接函数调用肯定要比go func快的多的多。虽然golang实现了gfree来缓存了goroutine对象,但毕竟要经过runtime的pmg调度。

// xiaorui.cc

func AfterFunc(d Duration, f func()) *Timer {
	t := &Timer{
		r: runtimeTimer{
			when: when(d),
			f:    goFunc,  // go func 异步调用
			arg:  f,
		},
	}
	startTimer(&t.r)
	return t
}

func goFunc(arg interface{}, seq uintptr) {
	go arg.(func())()
}

func NewTimer(d Duration) *Timer {
	c := make(chan Time, 1) // 缓冲为1,提高了效率
	t := &Timer{
		C: c,
		r: runtimeTimer{
			when: when(d),
			f:    sendTime,
			arg:  c,
		},
	}
	startTimer(&t.r)
	return t
}

func sendTime(c interface{}, seq uintptr) {
	select {
	case c.(chan Time) <- Now():
	default: // 不阻塞
	}
}

性能

没有做太深度的压力测试。添加50w个定时任务都在合理的时间内都执行完毕。

// xiaorui.cc

add timer cost:  330.263465ms
recv sig cost:  1.196655379s

如果性能还是达不到要求,可以封装多个时间轮到一个池子,类似go time标准库里timerprc的实现。

可能不会存在的问题?

如果添加的定时器超过时间轮的大小,那么我这里会给他加入一个round字段,标明他的轮数。那么问题来了,如果大量的任务超过了时间轮大小,那么一个槽位的map里除了存在正常的任务外,且会存在一些下一轮才能执行的任务。

这样会造成遍历的开销? 是有点,但时间轮的场景一般是用在时间短且任务多的超时场景里。如果任务的周期有些长,可以多实例化几组时间轮,比如精度为秒的和分钟的各一个时间轮。

结尾:

时间轮的效率确实很高,在一个高频的推送服务里会有各种定时器的使用,时间轮可以减少cpu消耗。