Golang时间轮实现 (详解版)

时间轮有单层时间轮和多层时间轮
本文仅是单层时间轮的实现
单层时间轮的概念图如下所示:
在这里插入图片描述
单层时间轮是由多个槽位组成,每个槽位维护着自己的链表
所以数据结构为:数组 + 链表 —> []*list.List

代码实现如下:
main.go

package main

import (
	"fmt"
	"time"
)

func main1() {

}

func main() {
	fmt.Println("k1 job before:", time.Now().Format("2006-01-02 15:04:05"))
	AddJob(time.Now().Add(time.Second*5), "k1", func() {
		fmt.Println("k1 job after:", time.Now().Format("2006-01-02 15:04:05"))
	})

	fmt.Println("k2 job before:", time.Now().Format("2006-01-02 15:04:05"))
	AddJob(time.Now().Add(time.Second*10), "k2", func() {
		fmt.Println("k2 job after:", time.Now().Format("2006-01-02 15:04:05"))
	})

	fmt.Println("k3 job before:", time.Now().Format("2006-01-02 15:04:05"))
	AddJob(time.Now().Add(time.Second*10), "k3", func() {
		fmt.Println("k3 job after:", time.Now().Format("2006-01-02 15:04:05"))
	})

	CancelJob("k2")
	select {}
	//time.Sleep(time.Second * 20)
}

delay.go (做了层封装,供外部使用)

package main

import "time"

var tw = NewTimeWheel(time.Second, 3600)

func init() {
	tw.Start() //启动时间轮
}

// AddJob 添加任务
func AddJob(t time.Time, key string, job func()) {
	tw.AddJob(t.Sub(time.Now()), key, job)
}

// CancelJob 取消任务
func CancelJob(key string) {
	tw.RemoveJob(key)
}

timewheel.go (时间轮具体实现)

package main

import (
	"container/list"
	"fmt"
	"log"
	"time"
)

// TimeWheel 时间轮
type TimeWheel struct {
	//属性:时间间隔、时间轮槽位数、当前所在槽位
	interval time.Duration
	slotNum  int
	currPos  int

	//属性:列表数组(当做时间轮) []*list.List
	slots []*list.List
	//属性:任务对应的节点信息 map[key]location
	m map[string]location

	//属性:添加任务chan、停止chan、移除任务chan
	addTaskChan    chan task
	stopChan       chan bool
	removeTaskChan chan string
	//属性:定时器
	timer *time.Ticker
}

// task 任务
type task struct {
	//属性:延时时间、key、job函数、属于的圈
	delay  time.Duration
	key    string
	job    func()
	circle int
}

// location 任务所处时间轮上的位置(方便移除任务)
type location struct {
	pos  int           //任务所在数组上的槽位
	elem *list.Element //任务所在list的节点
}

// NewTimeWheel 初始化
func NewTimeWheel(interval time.Duration, slotNum int) *TimeWheel {
	tw := &TimeWheel{
		interval:       interval,
		slotNum:        slotNum,
		currPos:        0,
		slots:          make([]*list.List, slotNum), //还需初始化*list.List
		m:              make(map[string]location),
		addTaskChan:    make(chan task),
		stopChan:       make(chan bool),
		removeTaskChan: make(chan string),
	}
	tw.initList()
	return tw
}

// 初始化槽位
func (tw *TimeWheel) initList() {
	for i, _ := range tw.slots {
		tw.slots[i] = list.New()
	}
}

// Start 运行
func (tw *TimeWheel) Start() {
	//初始化定时器
	tw.timer = time.NewTicker(tw.interval)
	go tw.start()
}

func (tw *TimeWheel) start() {
	//执行循环
	for {
		select {
		case <-tw.timer.C: //定时任务
			tw.tickHandle()
		case task := <-tw.addTaskChan: //新任务队列
			tw.addRealJob(&task)
		case key := <-tw.removeTaskChan: //移除任务
			tw.removeRealJob(key)
		case <-tw.stopChan: //停止
			tw.Stop()
			return //直接退出
		}
	}
}

// 执行任务
func (tw *TimeWheel) tickHandle() {
	//获取当前槽位队列
	list := tw.slots[tw.currPos]
	//槽位自增
	tw.currPos++
	if tw.currPos == tw.slotNum-1 {
		tw.currPos = 0
	}
	//起协程执行job
	go tw.scanAndRunJob(list)
}

func (tw *TimeWheel) scanAndRunJob(l *list.List) {
	//寻找可执行的任务
	//不能用该方法遍历list.List,因为list.Remove(elem)时,elem前驱、后缀指针会置空
	//所以elem = elem.Next()会导致,elem = nil
	//for elem := l.Front(); elem != nil; elem = elem.Next() {
	for elem := l.Front(); elem != nil; {
		task := elem.Value.(*task)
		if task.key == "k2" {
			fmt.Println(task)
		}
		//对多圈的任务,进行圈数-1
		if task.circle > 0 {
			task.circle--
			elem = elem.Next()
			continue
		}
		//剩下的为当前需要执行的任务(起协程执行任务)
		go func() {
			defer func() { //用于捕获 job的panic
				if err := recover(); err != nil {
					log.Fatalf("job err: %v", err)
					return
				}
			}()
			//执行任务
			task.job()
		}()
		//保存下一个elem
		next := elem.Next()
		//将当前的移除出队列、map
		l.Remove(elem)
		if task.key != "" {
			delete(tw.m, task.key)
		}
		elem = next
	}

}

// AddJob 添加新任务(将任务添加进队列)
func (tw *TimeWheel) AddJob(delay time.Duration, key string, job func()) {
	if delay < 0 { //传入的是延时多长时间
		return
	}
	tw.addTaskChan <- task{delay: delay, key: key, job: job}
}

// 添加任务(将任务添加进时间轮上)
func (tw *TimeWheel) addRealJob(task *task) {
	pos, circle := tw.getTaskPosAndCircle(task.delay)
	task.circle = circle
	//判断是否存在,若存在,则需先删除槽位上的任务,再加入
	if task.key != "" {
		//if _, ok := tw.m[task.key]; ok {
		//	tw.removeRealJob(task.key)
		//}
		tw.removeRealJob(task.key)
		//槽位
		elem := tw.slots[pos].PushBack(task)
		//map
		tw.m[task.key] = location{
			pos:  pos,
			elem: elem,
		}
	}
	fmt.Println("addRealJob end..., key:", task.key)
}

// RemoveJob 移除(将要移除的key放入chan) -- 解耦
func (tw *TimeWheel) RemoveJob(key string) {
	if key == "" {
		return
	}
	tw.removeTaskChan <- key
}

// 移除任务(真实的从时间轮上移除)
func (tw *TimeWheel) removeRealJob(key string) {
	loc, ok := tw.m[key]
	if !ok {
		return
	}
	tw.slots[loc.pos].Remove(loc.elem) //移除槽位队列
	delete(tw.m, key)                  //移除map
}

func (tw *TimeWheel) Stop() {
	tw.timer.Stop()
}

// 获取task所处时间轮的位置和圈数
func (tw *TimeWheel) getTaskPosAndCircle(delay time.Duration) (pos int, circle int) {
	//时间轮间隔
	//需要转的圈数
	//所在时间轮槽位
	delaySecond := int(delay.Seconds())
	circleSecond := int(tw.interval.Seconds())
	circle = int(delaySecond / circleSecond / tw.slotNum)
	pos = int(tw.currPos+delaySecond/circleSecond) % tw.slotNum
	return
}