Cron 源码阅读

robfig/cron/v3

使用 cron 可以很方便的实现一个定时任务,如下:

go get github.com/robfig/cron/v3@v3.0.0
package main

import "github.com/robfig/cron/v3"

c := cron.New()
// 添加一个任务,每 30s 执行一次
c.AddFunc("30 * * * *", func() { fmt.Println("Every hour on the half hour") })
// 开始执行(每个任务会在自己的 goroutine 中执行)
c.Start()

// 允许往正在执行的 cron 中添加任务
c.AddFunc("@daily", func() { fmt.Println("Every day") })

// 检查上一个和下一个任务执行的时间
inspect(c.Entries())
..
c.Stop()  // 停止调度,但正在运行的作业不会被停止

通过上面的示例,可以发现, cron 最常用的几个函数:

New()Cron.AddFunc()cronCron.Stop()
robfig/cron

这些特殊字符的含义如下:

*0 0 * 1 1 **/*/12 * * * * *,* * 5,10,15 3,4 * *-*/5 * 10-12 * * *?*
0 0 10 * * 1-5
@every @every 10m


源码概览

cron 并不是一个很大的库,核心文件与作用如下:

chain.goconstantdelay.gocron.gologger.gooption.goparser.gospec.go

1.核心数据结构和接口

type Entry truct
Entry
type EntryID int

type Entry struct {
	ID EntryID
	Schedule Schedule
	Next time.Time
	Prev time.Time
	WrappedJob Job
	Job Job
}
type Cron struct
type Cron struct {
    entries   []*Entry          // 保存了所有加入到 Cron 的作业
    chain     Chain
    stop      chan struct{}     // 接收 Stop() 信号的 chan
    add       chan *Entry       // Cron 运行过程中接收 AddJob() 信号的 chan 
    remove    chan EntryID      // 接收移除 Job 信号的 chan
    snapshot  chan chan []Entry // 快照信号
    running   bool              // 标志 Cron 是否在运行中
    logger    Logger
    runningMu sync.Mutex        // Cron 运行前需要抢占该锁,保证并发安全
    location  *time.Location
    parser    ScheduleParser    // cron 表达式的解析器
    nextID    EntryID           // 即将加入的 Job 对应的 Entry 的 ID
    jobWaiter sync.WaitGroup
}
interface
// Cron 表达式解析器接口,Parse 方法接收一个 Cron 表达式 spec,
// 返回一个解析出的 Schedule 类型对象
type ScheduleParser interface {
	Parse(spec string) (Schedule, error)
}

// Schedule 类型的对象用来表输 Job 的工作周期,它包含一个 Next() 方法,
// 用来返回 Job 下一次执行的时间
type Schedule interface {
	Next(time.Time) time.Time
}

// Job is an interface for submitted cron jobs.
type Job interface {
	Run()
}

2.对接口的实现

ScheduleParser 的实现
parser.goParser
type Parser struct {
	options ParseOption
}

func (p Parser) Parse(spec string) (Schedule, error) {...}
NewParser()
func NewParser(options ParseOption) Parser {
	optionals := 0
	if options&DowOptional > 0 {
		optionals++
	}
	if options&SecondOptional > 0 {
		optionals++
	}
	if optionals > 1 {
		panic("multiple optionals may not be configured")
	}
	return Parser{options}
}
parser.gostandardParser
var standardParser = NewParser(
	Minute | Hour | Dom | Month | Dow | Descriptor,
)

后续 Cron 所使用的就是这个解析器。

Schedule 的实现
spec.goSpecScheduleSchedule
type SpecSchedule struct {
	Second, Minute, Hour, Dom, Month, Dow uint64
	Location *time.Location
}

func (s *SpecSchedule) Next(t time.Time) time.Time {...}
Job 的实现
cron.go
type FuncJob func()

func (f FuncJob) Run() { f() }

总结

Cron 中核心数据结构的类图如下:


New()

cron.goNew()Corn
func New(opts ...Option) *Cron {
	c := &Cron{
		entries:   nil,
		chain:     NewChain(),
		add:       make(chan *Entry),
		stop:      make(chan struct{}),
		snapshot:  make(chan chan []Entry),
		remove:    make(chan EntryID),
		running:   false,
		runningMu: sync.Mutex{},
		logger:    DefaultLogger,
		location:  time.Local,
		parser:    standardParser,
	}
	for _, opt := range opts {
		opt(c)
	}
	return c
}

这个函数接收一组可变的 Option 类型的参数,该类型实际上是一类函数:

type Option func(*Cron)
option.goWithCronNew()Cron
c.parserstandardParserparser.goParseParseSchedleParse


AddFunc()

AddFunc()
func (c *Cron) AddFunc(spec string, cmd func()) (EntryID, error) {
    // 包装
	return c.AddJob(spec, FuncJob(cmd))
}

func (c *Cron) AddJob(spec string, cmd Job) (EntryID, error) {
	schedule, err := c.parser.Parse(spec)
	if err != nil {
		return 0, err
	}
	return c.Schedule(schedule, cmd), nil
}
AddFunc()AddJob()JobAddJob()standardParser.Parse()Schedule()
func (c *Cron) Schedule(schedule Schedule, cmd Job) EntryID {
	c.runningMu.Lock()
	defer c.runningMu.Unlock()
	c.nextID++
	entry := &Entry{
		ID:         c.nextID,
		Schedule:   schedule,
		WrappedJob: c.chain.Then(cmd),
		Job:        cmd,
	}
	if !c.running {
		c.entries = append(c.entries, entry)
	} else {
		c.add <- entry
	}
	return entry.ID
}
entryaddrun()


Entries() 和 Entry()

Entries()Entry(id EntryID)Entries()
func (c *Cron) Entry(id EntryID) Entry {
	for _, entry := range c.Entries() {
		if id == entry.ID {
			return entry
		}
	}
	return Entry{}
}
Entries()
func (c *Cron) Entries() []Entry {
	c.runningMu.Lock()
	defer c.runningMu.Unlock()
	if c.running {
		replyChan := make(chan []Entry, 1)
		c.snapshot <- replyChan
		return <-replyChan
	}
	return c.entrySnapshot()
}
runningMutex
entrySnapshot()
func (c *Cron) entrySnapshot() []Entry {
	var entries = make([]Entry, len(c.entries))
	for i, e := range c.entries {
		entries[i] = *e
	}
	return entries
}
c.snapshotcron.run()
case replyChan := <-c.snapshot:
    replyChan <- c.entrySnapshot()
    continue
Entries()replyChanc.snapshotrun()c.entrySnapshot()replyChanEntries()
c.entrySnapshot()


Remove()

Remove()Entries()
func (c *Cron) Remove(id EntryID) {
	c.runningMu.Lock()
	defer c.runningMu.Unlock()
	if c.running {
		c.remove <- id
	} else {
		c.removeEntry(id)
	}
}

func (c *Cron) removeEntry(id EntryID) {
	var entries []*Entry
	for _, e := range c.entries {
		if e.ID != id {
			entries = append(entries, e)
		}
	}
	c.entries = entries
}
run()c.remove
case id := <-c.remove:
    timer.Stop()
    now = c.now()
    c.removeEntry(id)
    c.logger.Info("removed", "entry", id)


Stop()

Stop()Stop()
func (c *Cron) Stop() context.Context {
	c.runningMu.Lock()
	defer c.runningMu.Unlock()
	if c.running {
		c.stop <- struct{}{}
		c.running = false
	}
	ctx, cancel := context.WithCancel(context.Background())
	go func() {
         // 等待所有已经在执行的作业执行完毕
		c.jobWaiter.Wait()
         // 会发出一个 cancelCtx.Done() 信号
		cancel()
	}()
	return ctx
}
Stop()ContextcancelCtxcancelCtx.Done()


Start()

Start()
func (c *Cron) Start() {
	c.runningMu.Lock()
	defer c.runningMu.Unlock()
	if c.running {
		return
	}
	c.running = true
	go c.run()
}

这个函数干了三件事:

c.runningtruec.run()runc.entries
run
func (c *Cron) run() {
	c.logger.Info("start")

    // 第一部分
	now := c.now()
	for _, entry := range c.entries {
		entry.Next = entry.Schedule.Next(now)
		c.logger.Info("schedule", "now", now, "entry", entry.ID, "next", entry.Next)
	}

    // 第二部分
	for {
        // 2.1
		sort.Sort(byTime(c.entries))

        // 2.2
		var timer *time.Timer
		if len(c.entries) == 0 || c.entries[0].Next.IsZero() {
			timer = time.NewTimer(100000 * time.Hour)
		} else {
			timer = time.NewTimer(c.entries[0].Next.Sub(now))
		}

        // 2.3
		for {
			select {}
			break
		}
	}
}

大概包含下面这几部分:

for {
    select {
        case now = <-timer.C:
            // ...
        
        case newEntry := <-c.add:
            // ...
        
        case replyChan := <-c.snapshot:
            // ...
            continue
        
        case <-c.stop:
            // ...
            return
        
        case id := <-c.remove:
           // ...
    }

    break
}

对 timer.C 的处理

case now = <-timer.C:
    now = now.In(c.location)
    c.logger.Info("wake", "now", now)

    // Run every entry whose next time was less than now
    for _, e := range c.entries {
        if e.Next.After(now) || e.Next.IsZero() {
            break
        }
        c.startJob(e.WrappedJob)
        e.Prev = e.Next
        e.Next = e.Schedule.Next(now)
        c.logger.Info("run", "now", now, "entry", e.ID, "next", e.Next)
    }

这个信号被触发有两种情况:

  1. 排序后 entries 中第 0 位的作业可以被执行了。
  2. 休眠了十万小时后,定时器被触发…

在处理这类信号时,run 会遍历所有的 entries, 因为这些作业都是按下一次执行时间排过序的,所以如果因为第一种情况出发了信号,说明至少有一个作业是可以执行的,我们遍历整个 entries,直到遇到一个作业可执行时间大于当前时间,说明前面遍历到的都是可以执行的,后面的都是不可以执行的;如果因为第二种情况发出来这个信号,则在第一次判断时就会 break

cron.startJob()
func (c *Cron) startJob(j Job) {
	c.jobWaiter.Add(1)
	go func() {
		defer c.jobWaiter.Done()
		j.Run()
	}()
}

这里的操作简单粗暴,直接开 goroutine 去执行,在使用时要注意定时任务一定要能结束,定时任务执行时间过长且执行速率很高时,可能造成 goroutine 泄露,进而可能导致内存溢出。

jobWaiterStop()

对 c.add 的处理

case newEntry := <-c.add:
    timer.Stop()
    now = c.now()
    newEntry.Next = newEntry.Schedule.Next(now)
    c.entries = append(c.entries, newEntry)
    c.logger.Info("added", "now", now, "entry", newEntry.ID, "next", newEntry.Next)

如果 cron 在运行的过程中有作业被加入,会停止定时器(新加入的作业需要重新进行排序),然后计算新作业的下一次执行时间(cron 未运行时添加作业没有这一步,是因为在 Start 的第一步会集中计算,集中计算结束后,进入第二步的死循环,就不会再次集中计算了),最后把新作业加入到 entries 列表中。

对 c.snapshot 的处理

case replyChan := <-c.snapshot:
    replyChan <- c.entrySnapshot()
    continue
Entries()
continuecontinuecaseselectbreakc.snapshotselectfor
c.snapshotcontinueselectrunc.addc.snapshot

对 c.stop 的处理

case <-c.stop:
    timer.Stop()
    c.logger.Info("stop")
    return
runrun()

对 c.remove 的处理

case id := <-c.remove:
    timer.Stop()
    now = c.now()
    c.removeEntry(id)
    c.logger.Info("removed", "entry", id)
c,add


Option

New()optionOption

WithLocation

time.Local
func WithLocation(loc *time.Location) Option {
	return func(c *Cron) {
		c.location = loc
	}
}

可以这样使用:

c := cron.New(cron.WithLocation(nyc))

WithSeconds

分钟 小时 日 月 星期Minute | Hour | Dom | Month | Dow
func WithSeconds() Option {
	return WithParser(NewParser(
		Second | Minute | Hour | Dom | Month | Dow | Descriptor,
	))
}

允许的字段如下:

const (
	Second         ParseOption = 1 << iota // Seconds field, default 0
	SecondOptional                         // Optional seconds field, default 0
	Minute                                 // Minutes field, default 0
	Hour                                   // Hours field, default 0
	Dom                                    // Day of month field, default *
	Month                                  // Month field, default *
	Dow                                    // Day of week field, default *
	DowOptional                            // Optional day of week field, default *
	Descriptor                             // Allow descriptors such as @monthly, @weekly, etc.
)

WithParser

如果你觉得 Cron 表达式是在难以理解,也记不住,可以写一个自己的解析器,用这个函数替代原来的解析器。

func WithParser(p ScheduleParser) Option {
	return func(c *Cron) {
		c.parser = p
	}
}

WithChain

修改默认修饰器

func WithChain(wrappers ...JobWrapper) Option {
	return func(c *Cron) {
		c.chain = NewChain(wrappers...)
	}
}

WihLogger

使用自定义的 logger

func WithLogger(logger Logger) Option {
	return func(c *Cron) {
		c.logger = logger
	}
}


Chain

这是一个很值得学习的装饰器模式,我们先看一下默认情况下,装饰器是怎么工作的:

chainNew()NewChain()
c := &Cron{
    entries:   nil,
    chain:     NewChain(),
    // ...
}
NewChain()
type Chain struct {
	wrappers []JobWrapper
}

func NewChain(c ...JobWrapper) Chain {
	return Chain{c}
}
EntryWrappedJob JobSchedule()chainThan()
entry := &Entry{
    ID:         c.nextID,
    Schedule:   schedule,
    WrappedJob: c.chain.Then(cmd),
    // ...
}
Then()
func (c Chain) Then(j Job) Job {
	for i := range c.wrappers {
		j = c.wrappers[len(c.wrappers)-i-1](j)
	}
	return j
}
Then()run()startJob()e.WrappedJobe.job
chain.go

Recover

recover()defer func(){}()
func Recover(logger Logger) JobWrapper {
	return func(j Job) Job {
		return FuncJob(func() {
			defer func() {
				if r := recover(); r != nil {
					const size = 64 << 10
					buf := make([]byte, size)
					buf = buf[:runtime.Stack(buf, false)]
					err, ok := r.(error)
					if !ok {
						err = fmt.Errorf("%v", r)
					}
					logger.Error(err, "panic", "stack", "...\n"+string(buf))
				}
			}()
			j.Run()
		})
	}
}

DelayIfStillRunning

这个装饰器的作用是保证一个 Job 的前一次执行完,后一次才执行,比如有一个 Job 需要执行 10s, 但执行频率是一秒一次,如果我们想要保证同时只有一个相同的 Job 被执行,就可以使用这个装饰器,在实现上,他是为每个 Job 添加了一个排它锁实现的,Job 执行前获取该锁,退出时释放锁,当一个 Job 等待该锁的时间大于一分钟,会记录在日志中,设计很巧妙。

func DelayIfStillRunning(logger Logger) JobWrapper {
	return func(j Job) Job {
		var mu sync.Mutex
		return FuncJob(func() {
			start := time.Now()
			mu.Lock()
			defer mu.Unlock()
			if dur := time.Since(start); dur > time.Minute {
				logger.Info("delay", "duration", dur)
			}
			j.Run()
		})
	}
}

SkipIfStillRunning

上面那个是等待执行完,这个是如果上一个还在执行,就直接跳过,在实现上,这个装饰器使用了一个容量为 1 的 chan, 在执行 Job 前,会消费 chan 里的数据,执行完后,再往 chan 里填一个数据,通过 select 监听 chan, 如果里面有数据,则执行,否则说明上一个还在执行,只打印一个日志就好了。

func SkipIfStillRunning(logger Logger) JobWrapper {
	return func(j Job) Job {
		var ch = make(chan struct{}, 1)
		ch <- struct{}{}
		return FuncJob(func() {
			select {
			case v := <-ch:
				defer func() { ch <- v }()
				j.Run()
			default:
				logger.Info("skip")
			}
		})
	}
}


总结

Cron 的几个特点:

New()OptionContext