Cron 源码阅读
robfig/cron/v3
使用 cron 可以很方便的实现一个定时任务,如下:
go get github.com/robfig/cron/v3@v3.0.0
package main
import "github.com/robfig/cron/v3"
c := cron.New()
// 添加一个任务,每 30s 执行一次
c.AddFunc("30 * * * *", func() { fmt.Println("Every hour on the half hour") })
// 开始执行(每个任务会在自己的 goroutine 中执行)
c.Start()
// 允许往正在执行的 cron 中添加任务
c.AddFunc("@daily", func() { fmt.Println("Every day") })
// 检查上一个和下一个任务执行的时间
inspect(c.Entries())
..
c.Stop() // 停止调度,但正在运行的作业不会被停止
通过上面的示例,可以发现, cron 最常用的几个函数:
New()Cron.AddFunc()cronCron.Stop()
robfig/cron
这些特殊字符的含义如下:
*0 0 * 1 1 **/*/12 * * * * *,* * 5,10,15 3,4 * *-*/5 * 10-12 * * *?*
0 0 10 * * 1-5
@every @every 10m
源码概览
cron 并不是一个很大的库,核心文件与作用如下:
chain.goconstantdelay.gocron.gologger.gooption.goparser.gospec.go
1.核心数据结构和接口
type Entry truct
Entry
type EntryID int
type Entry struct {
ID EntryID
Schedule Schedule
Next time.Time
Prev time.Time
WrappedJob Job
Job Job
}
type Cron struct
type Cron struct {
entries []*Entry // 保存了所有加入到 Cron 的作业
chain Chain
stop chan struct{} // 接收 Stop() 信号的 chan
add chan *Entry // Cron 运行过程中接收 AddJob() 信号的 chan
remove chan EntryID // 接收移除 Job 信号的 chan
snapshot chan chan []Entry // 快照信号
running bool // 标志 Cron 是否在运行中
logger Logger
runningMu sync.Mutex // Cron 运行前需要抢占该锁,保证并发安全
location *time.Location
parser ScheduleParser // cron 表达式的解析器
nextID EntryID // 即将加入的 Job 对应的 Entry 的 ID
jobWaiter sync.WaitGroup
}
interface
// Cron 表达式解析器接口,Parse 方法接收一个 Cron 表达式 spec,
// 返回一个解析出的 Schedule 类型对象
type ScheduleParser interface {
Parse(spec string) (Schedule, error)
}
// Schedule 类型的对象用来表输 Job 的工作周期,它包含一个 Next() 方法,
// 用来返回 Job 下一次执行的时间
type Schedule interface {
Next(time.Time) time.Time
}
// Job is an interface for submitted cron jobs.
type Job interface {
Run()
}
2.对接口的实现
ScheduleParser 的实现
parser.goParser
type Parser struct {
options ParseOption
}
func (p Parser) Parse(spec string) (Schedule, error) {...}
NewParser()
func NewParser(options ParseOption) Parser {
optionals := 0
if options&DowOptional > 0 {
optionals++
}
if options&SecondOptional > 0 {
optionals++
}
if optionals > 1 {
panic("multiple optionals may not be configured")
}
return Parser{options}
}
parser.gostandardParser
var standardParser = NewParser(
Minute | Hour | Dom | Month | Dow | Descriptor,
)
后续 Cron 所使用的就是这个解析器。
Schedule 的实现
spec.goSpecScheduleSchedule
type SpecSchedule struct {
Second, Minute, Hour, Dom, Month, Dow uint64
Location *time.Location
}
func (s *SpecSchedule) Next(t time.Time) time.Time {...}
Job 的实现
cron.go
type FuncJob func()
func (f FuncJob) Run() { f() }
总结
Cron 中核心数据结构的类图如下:
New()
cron.goNew()Corn
func New(opts ...Option) *Cron {
c := &Cron{
entries: nil,
chain: NewChain(),
add: make(chan *Entry),
stop: make(chan struct{}),
snapshot: make(chan chan []Entry),
remove: make(chan EntryID),
running: false,
runningMu: sync.Mutex{},
logger: DefaultLogger,
location: time.Local,
parser: standardParser,
}
for _, opt := range opts {
opt(c)
}
return c
}
这个函数接收一组可变的 Option 类型的参数,该类型实际上是一类函数:
type Option func(*Cron)
option.goWithCronNew()Cron
c.parserstandardParserparser.goParseParseSchedleParse
AddFunc()
AddFunc()
func (c *Cron) AddFunc(spec string, cmd func()) (EntryID, error) {
// 包装
return c.AddJob(spec, FuncJob(cmd))
}
func (c *Cron) AddJob(spec string, cmd Job) (EntryID, error) {
schedule, err := c.parser.Parse(spec)
if err != nil {
return 0, err
}
return c.Schedule(schedule, cmd), nil
}
AddFunc()AddJob()JobAddJob()standardParser.Parse()Schedule()
func (c *Cron) Schedule(schedule Schedule, cmd Job) EntryID {
c.runningMu.Lock()
defer c.runningMu.Unlock()
c.nextID++
entry := &Entry{
ID: c.nextID,
Schedule: schedule,
WrappedJob: c.chain.Then(cmd),
Job: cmd,
}
if !c.running {
c.entries = append(c.entries, entry)
} else {
c.add <- entry
}
return entry.ID
}
entryaddrun()
Entries() 和 Entry()
Entries()Entry(id EntryID)Entries()
func (c *Cron) Entry(id EntryID) Entry {
for _, entry := range c.Entries() {
if id == entry.ID {
return entry
}
}
return Entry{}
}
Entries()
func (c *Cron) Entries() []Entry {
c.runningMu.Lock()
defer c.runningMu.Unlock()
if c.running {
replyChan := make(chan []Entry, 1)
c.snapshot <- replyChan
return <-replyChan
}
return c.entrySnapshot()
}
runningMutex
entrySnapshot()
func (c *Cron) entrySnapshot() []Entry {
var entries = make([]Entry, len(c.entries))
for i, e := range c.entries {
entries[i] = *e
}
return entries
}
c.snapshotcron.run()
case replyChan := <-c.snapshot:
replyChan <- c.entrySnapshot()
continue
Entries()replyChanc.snapshotrun()c.entrySnapshot()replyChanEntries()
c.entrySnapshot()
Remove()
Remove()Entries()
func (c *Cron) Remove(id EntryID) {
c.runningMu.Lock()
defer c.runningMu.Unlock()
if c.running {
c.remove <- id
} else {
c.removeEntry(id)
}
}
func (c *Cron) removeEntry(id EntryID) {
var entries []*Entry
for _, e := range c.entries {
if e.ID != id {
entries = append(entries, e)
}
}
c.entries = entries
}
run()c.remove
case id := <-c.remove:
timer.Stop()
now = c.now()
c.removeEntry(id)
c.logger.Info("removed", "entry", id)
Stop()
Stop()Stop()
func (c *Cron) Stop() context.Context {
c.runningMu.Lock()
defer c.runningMu.Unlock()
if c.running {
c.stop <- struct{}{}
c.running = false
}
ctx, cancel := context.WithCancel(context.Background())
go func() {
// 等待所有已经在执行的作业执行完毕
c.jobWaiter.Wait()
// 会发出一个 cancelCtx.Done() 信号
cancel()
}()
return ctx
}
Stop()ContextcancelCtxcancelCtx.Done()
Start()
Start()
func (c *Cron) Start() {
c.runningMu.Lock()
defer c.runningMu.Unlock()
if c.running {
return
}
c.running = true
go c.run()
}
这个函数干了三件事:
c.runningtruec.run()runc.entries
run
func (c *Cron) run() {
c.logger.Info("start")
// 第一部分
now := c.now()
for _, entry := range c.entries {
entry.Next = entry.Schedule.Next(now)
c.logger.Info("schedule", "now", now, "entry", entry.ID, "next", entry.Next)
}
// 第二部分
for {
// 2.1
sort.Sort(byTime(c.entries))
// 2.2
var timer *time.Timer
if len(c.entries) == 0 || c.entries[0].Next.IsZero() {
timer = time.NewTimer(100000 * time.Hour)
} else {
timer = time.NewTimer(c.entries[0].Next.Sub(now))
}
// 2.3
for {
select {}
break
}
}
}
大概包含下面这几部分:
for {
select {
case now = <-timer.C:
// ...
case newEntry := <-c.add:
// ...
case replyChan := <-c.snapshot:
// ...
continue
case <-c.stop:
// ...
return
case id := <-c.remove:
// ...
}
break
}
对 timer.C 的处理
case now = <-timer.C:
now = now.In(c.location)
c.logger.Info("wake", "now", now)
// Run every entry whose next time was less than now
for _, e := range c.entries {
if e.Next.After(now) || e.Next.IsZero() {
break
}
c.startJob(e.WrappedJob)
e.Prev = e.Next
e.Next = e.Schedule.Next(now)
c.logger.Info("run", "now", now, "entry", e.ID, "next", e.Next)
}
这个信号被触发有两种情况:
- 排序后 entries 中第 0 位的作业可以被执行了。
- 休眠了十万小时后,定时器被触发…
在处理这类信号时,run 会遍历所有的 entries, 因为这些作业都是按下一次执行时间排过序的,所以如果因为第一种情况出发了信号,说明至少有一个作业是可以执行的,我们遍历整个 entries,直到遇到一个作业可执行时间大于当前时间,说明前面遍历到的都是可以执行的,后面的都是不可以执行的;如果因为第二种情况发出来这个信号,则在第一次判断时就会 break
cron.startJob()
func (c *Cron) startJob(j Job) {
c.jobWaiter.Add(1)
go func() {
defer c.jobWaiter.Done()
j.Run()
}()
}
这里的操作简单粗暴,直接开 goroutine 去执行,在使用时要注意定时任务一定要能结束,定时任务执行时间过长且执行速率很高时,可能造成 goroutine 泄露,进而可能导致内存溢出。
jobWaiterStop()
对 c.add 的处理
case newEntry := <-c.add:
timer.Stop()
now = c.now()
newEntry.Next = newEntry.Schedule.Next(now)
c.entries = append(c.entries, newEntry)
c.logger.Info("added", "now", now, "entry", newEntry.ID, "next", newEntry.Next)
如果 cron 在运行的过程中有作业被加入,会停止定时器(新加入的作业需要重新进行排序),然后计算新作业的下一次执行时间(cron 未运行时添加作业没有这一步,是因为在 Start 的第一步会集中计算,集中计算结束后,进入第二步的死循环,就不会再次集中计算了),最后把新作业加入到 entries 列表中。
对 c.snapshot 的处理
case replyChan := <-c.snapshot:
replyChan <- c.entrySnapshot()
continue
Entries()
continuecontinuecaseselectbreakc.snapshotselectfor
c.snapshotcontinueselectrunc.addc.snapshot
对 c.stop 的处理
case <-c.stop:
timer.Stop()
c.logger.Info("stop")
return
runrun()
对 c.remove 的处理
case id := <-c.remove:
timer.Stop()
now = c.now()
c.removeEntry(id)
c.logger.Info("removed", "entry", id)
c,add
Option
New()optionOption
WithLocation
time.Local
func WithLocation(loc *time.Location) Option {
return func(c *Cron) {
c.location = loc
}
}
可以这样使用:
c := cron.New(cron.WithLocation(nyc))
WithSeconds
分钟 小时 日 月 星期Minute | Hour | Dom | Month | Dow
func WithSeconds() Option {
return WithParser(NewParser(
Second | Minute | Hour | Dom | Month | Dow | Descriptor,
))
}
允许的字段如下:
const (
Second ParseOption = 1 << iota // Seconds field, default 0
SecondOptional // Optional seconds field, default 0
Minute // Minutes field, default 0
Hour // Hours field, default 0
Dom // Day of month field, default *
Month // Month field, default *
Dow // Day of week field, default *
DowOptional // Optional day of week field, default *
Descriptor // Allow descriptors such as @monthly, @weekly, etc.
)
WithParser
如果你觉得 Cron 表达式是在难以理解,也记不住,可以写一个自己的解析器,用这个函数替代原来的解析器。
func WithParser(p ScheduleParser) Option {
return func(c *Cron) {
c.parser = p
}
}
WithChain
修改默认修饰器
func WithChain(wrappers ...JobWrapper) Option {
return func(c *Cron) {
c.chain = NewChain(wrappers...)
}
}
WihLogger
使用自定义的 logger
func WithLogger(logger Logger) Option {
return func(c *Cron) {
c.logger = logger
}
}
Chain
这是一个很值得学习的装饰器模式,我们先看一下默认情况下,装饰器是怎么工作的:
chainNew()NewChain()
c := &Cron{
entries: nil,
chain: NewChain(),
// ...
}
NewChain()
type Chain struct {
wrappers []JobWrapper
}
func NewChain(c ...JobWrapper) Chain {
return Chain{c}
}
EntryWrappedJob JobSchedule()chainThan()
entry := &Entry{
ID: c.nextID,
Schedule: schedule,
WrappedJob: c.chain.Then(cmd),
// ...
}
Then()
func (c Chain) Then(j Job) Job {
for i := range c.wrappers {
j = c.wrappers[len(c.wrappers)-i-1](j)
}
return j
}
Then()run()startJob()e.WrappedJobe.job
chain.go
Recover
recover()defer func(){}()
func Recover(logger Logger) JobWrapper {
return func(j Job) Job {
return FuncJob(func() {
defer func() {
if r := recover(); r != nil {
const size = 64 << 10
buf := make([]byte, size)
buf = buf[:runtime.Stack(buf, false)]
err, ok := r.(error)
if !ok {
err = fmt.Errorf("%v", r)
}
logger.Error(err, "panic", "stack", "...\n"+string(buf))
}
}()
j.Run()
})
}
}
DelayIfStillRunning
这个装饰器的作用是保证一个 Job 的前一次执行完,后一次才执行,比如有一个 Job 需要执行 10s, 但执行频率是一秒一次,如果我们想要保证同时只有一个相同的 Job 被执行,就可以使用这个装饰器,在实现上,他是为每个 Job 添加了一个排它锁实现的,Job 执行前获取该锁,退出时释放锁,当一个 Job 等待该锁的时间大于一分钟,会记录在日志中,设计很巧妙。
func DelayIfStillRunning(logger Logger) JobWrapper {
return func(j Job) Job {
var mu sync.Mutex
return FuncJob(func() {
start := time.Now()
mu.Lock()
defer mu.Unlock()
if dur := time.Since(start); dur > time.Minute {
logger.Info("delay", "duration", dur)
}
j.Run()
})
}
}
SkipIfStillRunning
上面那个是等待执行完,这个是如果上一个还在执行,就直接跳过,在实现上,这个装饰器使用了一个容量为 1 的 chan, 在执行 Job 前,会消费 chan 里的数据,执行完后,再往 chan 里填一个数据,通过 select 监听 chan, 如果里面有数据,则执行,否则说明上一个还在执行,只打印一个日志就好了。
func SkipIfStillRunning(logger Logger) JobWrapper {
return func(j Job) Job {
var ch = make(chan struct{}, 1)
ch <- struct{}{}
return FuncJob(func() {
select {
case v := <-ch:
defer func() { ch <- v }()
j.Run()
default:
logger.Info("skip")
}
})
}
}
总结
Cron 的几个特点:
New()OptionContext