延时应用场景
之前的文章分享了分布式任务调度系统负载均衡方案:分布式任务调度系统分发及负载均衡实现方案。
一个完整的任务调度系统,对延时任务的支持必不可少。延时任务、延迟消息、延迟队列基本语境和实现类似,那么它有哪些适用场景呢?最常见的如:用户下单xx分钟内未付款订单自动取消,释放库存;订单发货后xx天自动确认收货;订单结束后xx天自动评价;用户注册后1min内触发xx动作等。
延时解决方案
延时作为常见的需求自然有众多解决方案,数据库轮询是最容易想到的一个方案,时间轮,小顶堆,有序链表,延时队列以及各类开源项目也是琳琅满目。了解每种解决方案的原理以及优缺点,可以帮助在生产中做好技术选型。
1.数据库轮询
最简单且容易想到的方案是后台启动定时脚本,定时轮询扫描数据库获取满足条件数据并处理,这种方案实现简单有效。
时间处理精度问题,linux系统crontab最小是1分钟,如果需要更细时间粒度可以通过脚本for{}无限循环轮询数据库,总执行时间为50秒,每次轮询后sleep10秒,类似操作可达成更小时间粒度。
此方案项目初级比较有效,但也有较多弊端:
-
轮询粒度不好把控,轮询间隔时间过长影响精准度,过短又会产生大量不必要的数据库扫描,增加数据库压力;
-
随着数据量增大此方案存在较大性能瓶颈;
-
延时任务过多也会造成定时脚本不易维护。
RabbitMQ本身不支持延时消息,但可通过死信队列及死信路由设置间接达成。
TTL(Time to live)分消息TTL和队列TTL,控制消息超时时间,消息在队列中生存时间一旦超过TTL设置时间即成为dead letter(死信),然后通过Dead letter exchange死信路由交换机来重新路由消息。
方案分析
利用成熟RabbitMQ消息组件,稳定、易扩展、支持分布式,消息支持持久化可靠性好。但消息的延时时间需要保持一致,死信队列还是先进先出,如果先进的队列由于未到执行时间会阻塞所有后入消息,因此一种延时时间需要建一套路由。
除死信队列方案外还有一些RabbitMQ的插件可以实现延时,具体可下载插件:
rabbitmq_delayed_message_exchange
2.2 RocketMQRocketMQ是支持延时消息的,且足够高效可靠,但延迟消息的时间不是任意时间,而是仅支持18个固定的时间段,这里不再赘述。
3.时间轮算法
时间轮算法是实现延时最常用的算法,这里重点介绍它的实现方案。
3.1实现原理可以想象一个时钟的表盘,有一个指针绕着转动,每走一个格子称为一个刻度(时间间隔interval),表盘每个格子上挂载待执行任务列表(任务桶buckets),指针转动一圈总长度(bucketSize),这些元素构成一个时间轮。
如果刻度是1s,总长度是60s,那么转一圈就是1分钟,可以实现1分钟内的延时。要实现更长时间跨度,可将总长度设置更大,但这会造成占用内存过大,更多空转浪费资源。有两种优化方案,使用多层时间轮或多级时间轮。
-
多层时间轮就是增加圈数circle,一圈代表60s,那么10圈就是10分钟。
-
多级时间轮可以想象成时钟的时针、分针、秒针,一级到达后执行二级,再到三级,直到满足执行任务。
3.2具体代码
定义时间轮结构如下:
type TimeWheel struct { ticker *time.Ticker //ticker interval time.Duration //time duration of moving one slot. buckets []*list.List //bucket list bucketSize int //total size of bucket currentPos int //current position in buckets callbackFunc func(interface{}) //execute func stopChannel chan bool //stop the ticker channel }
定时器触发使用time.Ticker,它是Go自身实现的内置定时器,基于最小堆结构实现。Buckets存放任务列表,使用双向链表container/list结构,注意它非线程安全。
新建一个时间轮实例:
//create timewheel instance func New(interval time.Duration, bucketSize int, callbackFunc func(interface{})) (*TimeWheel, e rror) { if interval <= 0 || bucketSize <= 0 || callbackFunc == nil { return nil, errors.New("create timewheel instance fail") } tw := &TimeWheel{ interval: interval, buckets: make([]*list.List, bucketSize), bucketSize: bucketSize, currentPos: 0, callbackFunc: callbackFunc, stopChannel: make(chan bool), } //init bucket,every bucket will have a list for i := 0; i < bucketSize; i++ { tw.buckets[i] = list.New() } return tw, nil }
定义任务Task结构体,并添加任务。为了构造多层时间轮,给任务添加circle代表该任务在第几圈。pos代表任务在当前表盘上的位置。
//define task type Task struct { Id interface{} //task id global uniqueness Data interface{} //data of task Delay time.Duration //delay time, 30 means after 30 second Circle int //task position in timewheel } //add task func (tw *TimeWheel) AddTask(task *Task) { delaySeconds := int(task.Delay.Seconds()) intervalSeconds := int(tw.interval.Seconds()) circle := int(delaySeconds / intervalSeconds / tw.bucketSize) pos := int(tw.currentPos+delaySeconds/intervalSeconds) % tw.bucketSize task.Circle = circle tw.buckets[pos].PushBack(task) }
启动时间轮,每经过一刻度(这个刻度可以是1s、5s任意),做一次检查,如果当前格里有任务则取出执行,碰到多圈任务将circle-1。当指针走到末尾代表走完一圈,会重置再从头执行。
//start timewheel func (tw *TimeWheel) Start() { //add ticker tw.ticker = time.NewTicker(tw.interval) //receive chan go func() { for { select { case <-tw.ticker.C: //reach a tick log.Println("1 tick") tw.tickHandler() case <-tw.stopChannel: //true tw.ticker.Stop() //stop the ticker return } } }() } //1 tick handler func (tw *TimeWheel) tickHandler() { bucket := tw.buckets[tw.currentPos] for e := bucket.Front(); e != nil; { task := e.Value.(*Task) //e.value is a task if task.Circle > 0 { task.Circle-- e = e.Next() continue } //do task go tw.callbackFunc(task.Data) //remove e next := e.Next() bucket.Remove(e) e = next } //finish 1 circle,reset if tw.currentPos == tw.bucketSize-1 { log.Println("new circle") tw.currentPos = 0 } else { tw.currentPos++ } }
测试时间轮一圈10s,间隔刻度1s,添加延时12s的延时任务,第13s后执行任务。
func TestTimeWheel(t *testing.T) { tw, err := New(1*time.Second, 10, func(data interface{}) { log.Println("do task", data) }) if err != nil { t.Error(err) } log.Println("start timewheel...") tw.Start() task := Task{Id: 1, Data: "test1", Delay: 12 * time.Second} tw.AddTask(&task) time.Sleep(20 * time.Second) }
执行效果:
3.3 更多细节考虑
3.3.1 长时间跨度的解决方案
由于时间跨度越大轮子越大,会占用更多内存,所以可以考虑采用磁盘文件+内存时间轮相结合的方案。内存时间轮只加载1小时的任务,磁盘文件可以时间命名(2020101721代表2020年10月17日21:00-21:59:59所有延时任务),每小时一个文件,一天24个,一般情况不会保存太多文件。
3.3.2 内存时间轮的高可用性
因为采用内存时间轮,如果程序崩溃会导致数据丢失。将时间轮持久化保存成文件存储,到达时间后预加载到内存,程序崩溃、重启后也可以重新加载,文件保存可保障数据不会丢失,当然也可保存在redis或其他持久化存储中。
除内存时间轮外也可以直接使用redis的list结构替代container/list,redis的string结构保存时间轮当前指针。
考虑恢复时间轮后需要确认哪些未执行,那么可以在执行的时候记录成功执行日志或记录执行位置偏移。考虑是否执行成功,按at least once语义可以再发送/执行一次,需要下游保障幂等。3.3.3 任务执行方式callback如果仅是发送消息等毫秒级完成还可以,如果是执行http/rpc调用且较慢将会拖垮整个延时任务系统,所以不要在callback做重任务,可以将到达延时的任务统一放到待发送MQ中,异步执行。3.3.4 分布式集群任务分发单个时间轮处理任务能力有限,任务量大可以对任务数据分片处理,开启多个时间轮并行处理。在任务添加时,根据Id取模或hash分片,保存在不同的时间轮文件中。如每小时再分10个任务片,分别由10个时间轮加载。2020101721_0 2020101721_1 2020101721_2 ... 2020101721_9
3.4 方案分析
时间轮方案执行效率高,时间精度高,但内存时间轮重启或宕机后需要考虑持久化和消费标记,集群扩展实现也较复杂。
4.排序链表算法要使用排序链表数据结构,最先想到的就是redis的sorted set结构,这里以redis有序集合为基础来实现延时。
4.1 实现原理
redis有序集合zset结构是一个有序链表,可以通过zadd向链表添加元素,并将其score设置为延时任务执行的时间戳,值设为任务id。然后通过zrange获取链表第一个元素(默认是score最小元素),通过判断score和当前时间大小,决定是否到达执行时间。
按时间轮设计思想定义一个带定时器的结构体:
//define bucket ticker type BucketTicker struct { Ticker *time.Ticker Interval time.Duration Name string CallbackFunc func(interface{}) bool } //new ticker func New(interval time.Duration, bucketName string, callbackFunc func(interface{}) bool) (*Buck etTicker, error) { if interval <= 0 || callbackFunc == nil { return nil, errors.New("create bucket ticker instance fail") } bucket := &BucketTicker{ Interval: interval, Name: bucketName, CallbackFunc: callbackFunc, } return bucket, nil }
定义任务及添加方法,将任务的执行时间(当前时间+延时时间)和任务唯一Id存到zset结构中,将任务主体序列化存到kv结构(string)中。
//define task type Task struct { Id string //task id global uniqueness Data interface{} //data of task Delay time.Duration //delay time, 30 means after 30 second Timestamp int } //add task func (bucket *BucketTicker) AddTask(task *Task) error { //task id and delay time in redis zset timestamp := time.Now().Add(task.Delay).Unix() err := redisclient.ZAdd(bucket.Name, int(timestamp), task.Id) if err != nil { return err } //task body in redis string data, err := json.Marshal(task) if err != nil { return err } err = redisclient.Set(task.Id, string(data)) if err != nil { return err } return nil }
启动定时器,每隔一个刻度,检查是否有满足执行时间的任务。间隔时间越长,可以减少与redis查询频率,但延时任务处理精度会降低。
func (bucket *BucketTicker) Start() { timer := time.NewTicker(bucket.Interval) //interval go func() { for { select { case t := <-timer.C: log.Println("1 tick") bucket.tickHandler(t, bucket.Name) } } }() } //tick handler func (bucket *BucketTicker) tickHandler(currentTime time.Time, bucketName string) { for { task, err := getTask(bucketName) if err != nil { log.Println("error happen!", err) return } if task == nil { //no task return } //not arrival execution time if task.Timestamp > int(currentTime.Unix()) { return } //do task taskDetail, err := getTaskDetail(task.Id) if err != nil { //retry log.Println("error happen!", err) continue } //if callback success, remove finish task if ok := bucket.CallbackFunc(taskDetail.Data); ok { err = removeTask(bucketName, task.Id) if err != nil { continue } } else { log.Println("error happen!", errors.New("callback error")) continue //retry } return } }
getTask(),getTaskDetail()和removeTask()分别执行Redis操作。
//get task from redis zset func getTask(bucketName string) (*Task, error) { value, err := redisclient.ZRangeFirst(bucketName) //ZRANGE key 0 0 WITHSCORES if err != nil { return nil, err } if value == nil { return nil, nil } timestamp := int(value[0].(float64)) taskId := value[1].(string) task := Task{ Id: taskId, Timestamp: timestamp, } return &task, nil } //get task detail by taskId func getTaskDetail(taskId string) (*Task, error) { v, err := redisclient.Get(taskId) if err != nil { return nil, err } if v == "" { return nil, nil } task := Task{} err = json.Unmarshal([]byte(v), &task) if err != nil { return nil, err } return &task, nil } //remove the task func removeTask(bucketName string, taskId string) error { err := redisclient.ZRem(bucketName, taskId) if err != nil { return err } err = redisclient.Del(taskId) if err != nil { return err } return nil }
编写测试用例测试,添加2个延时任务分别是延时5秒和延时8秒。
func TestRedisDelay(t *testing.T) { delay, err := New(1*time.Second, "test", func(data interface{}) bool { log.Println("do task ", data) return true }) if err != nil { t.Error(err) } log.Println("start ticker...") delay.Start() task1 := Task{Id: "1", Data: "task1", Delay: 5 * time.Second} task2 := Task{Id: "2", Data: "task2", Delay: 8 * time.Second} delay.AddTask(&task1) delay.AddTask(&task2) time.Sleep(10 * time.Second) }
执行效果如下:
4.3 分布式集群任务分片
当有更多延时任务时,考虑存储多个bucket,每个bucket有自己的定时器,执行自己的任务列表。当有任务添加时,轮询加入不同bucket中。
4.4 方案分析由于依赖比较成熟的组件redis,高可用程序挂掉重启后仍可继续处理,集群分片拓展也容易。但由于每次都取出数据比对score,会有频繁Redis IO操作,造成较大的资源浪费。
5.总结延时方案方案除上述几种外还有最小堆的形式,文中提到的Go内置定时器即采用四叉堆结构,其实现原理与排序链表大同小异。
选择何种方案根据业务场景和业务规模而定。数据库轮询方案简单实用,在业务初期非常合适。延时队列方案实现简单,可以结合队列一起使用。当这些都不能满足业务时,再考虑自建延时系统,可以采用时间轮方案或有序链表方案。文章相关代码请关注公众号 “技术岁月”,发送关键字“延时任务”获取。