goCron是一个Golang作业调度工具,可以使用简单的语法定期执行go函数。
使用实例
package main
import (
"fmt"
"github.com/jasonlvhit/gocron"
)
func task() {
fmt.Println("I am runnning task.")
}
func taskWithParams(a int, b string) {
fmt.Println(a, b)
}
func main() {
//可并发运行多个任务
//注意 interval>1时调用sAPi
gocron.Every(2).Seconds().Do(task)
gocron.Every(1).Second().Do(taskWithParams, 1, "hi")
//在cron所有操作最后调用 start函数,否则start之后调用的操作无效不执行
//<-gocron.Start()
//在task执行过程中 禁止异常退出
gocron.Every(1).Minute().DoSafely(taskWithParams, 1, "hello")
// 支持在具体某一天、某天的某一时刻、每y-M-d h-m-s 执行任务
gocron.Every(1).Monday().Do(task)
gocron.Every(1).Thursday().Do(task)
// function At() take a string like 'hour:min'
gocron.Every(1).Day().At("10:30").Do(task)
gocron.Every(1).Monday().At("18:30").Do(task)
// 删除某一任务
gocron.Remove(task)
//删除所有任务
gocron.Clear()
//可同时创建一个新的任务调度 2个schedulers 同时执行
s := gocron.NewScheduler()
s.Every(3).Seconds().Do(task)
<-s.Start()
//防止多个集群中任务同时执行 task 实现lock接口
//两行代码,对cron 设置lock实现,执行task时调用Lock方法再Do task
gocron.SetLocker(lockerImplementation)
gocron.Every(1).Hour().Lock().Do(task)
<-gocron.Start()
}
源码浅析
轻量 简洁的链式调用,看工程源码,简洁的……只有一个类。gocron当前支持最多1w个任务数,核心对象就是维护了job对象,所有执行的任务都放在jobs数组中,start方法底层调用go time包下的NewTicker,新启一个线程执行task方法。
外部调用的gocron.start func调用链
// Start all the pending jobs
// Add seconds ticker
func (s *Scheduler) Start() chan bool {
stopped := make(chan bool, 1)
ticker := time.NewTicker(1 * time.Second)
go func() {
for {
select {
case <-ticker.C:
s.RunPending() //调用RunPending 执行数组有序的任务队列
case <-stopped:
ticker.Stop()
return
}
}
}()
return stopped
}
// RunPending runs all the jobs that are scheduled to run.
func (s *Scheduler) RunPending() {
runnableJobs, n := s.getRunnableJobs()
if n != 0 {
for i := 0; i < n; i++ {
runnableJobs[i].run()
}
}
}
//run方法,反射获取job的各属性,最终调用function.call方法执行任务函数
//Run the job and immediately reschedule it
func (j *Job) run() (result []reflect.Value, err error) {
if j.lock {
if locker == nil {
err = fmt.Errorf("trying to lock %s with nil locker", j.jobFunc)
return
}
key := getFunctionKey(j.jobFunc)
if ok, err := locker.Lock(key); err != nil || !ok {
return nil, err
}
defer func() {
if e := locker.Unlock(key); e != nil {
err = e
}
}()
}
f := reflect.ValueOf(j.funcs[j.jobFunc])
params := j.fparams[j.jobFunc]
if len(params) != f.Type().NumIn() {
err = errors.New("the number of param is not adapted")
return
}
in := make([]reflect.Value, len(params))
for k, param := range params {
in[k] = reflect.ValueOf(param)
}
result = f.Call(in)
j.lastRun = time.Now()
j.scheduleNextRun()
return
}
这里需要关注一下,gocron对lock的实现,从代码上看Job结构体的lock属性,用于控制多实例job并发执行。但项目woner提到的 multiple instances 指的并不是跨服务器的多实例,而是在同一应用服务 里的多任务实例(也就是1个app服务中多个任务,粒度是只在统一应用内)。如果跨server则lock需要自行依赖redis或其他分布式锁来管理。通过读源码的run方法,j.lock来控制job并发,但一旦跨server job.lock属性是没法共享的。这里doc上给的解释有点歧义,需要注意。
If you need to prevent a job from running at the same time from multiple cron instances (like running a cron app from multiple servers), you can provide a Locker implementation and lock the required jobs. 然后owner给出了一个基于redis来做的lock
Job结构体
// Job struct keeping information about job
type Job struct {
interval uint64 // pause interval * unit bettween runs
jobFunc string // the job jobFunc to run, func[jobFunc]
unit string // time units, ,e.g. 'minutes', 'hours'...
atTime time.Duration // optional time at which this job runs
lastRun time.Time // datetime of last run
nextRun time.Time // datetime of next run
startDay time.Weekday // Specific day of the week to start on
funcs map[string]interface{} // Map for the function task store
fparams map[string][]interface{} // Map for function and params of function
lock bool // lock the job from running at same time form multiple instances
}
scheduler内维护JOBs array数组和大小,gocron scheduler最大可执行1w个job(大小可重写)。
// Scheduler struct, the only data member is the list of jobs.
// - implements the sort.Interface{} for sorting jobs, by the time nextRun
type Scheduler struct {
jobs [MAXJOBNUM]*Job // Array store jobs ,const MAXJOBNUM = 10000
size int // Size of jobs which jobs holding.
}
对于执行时间的控制,均通过对job的unit属性进行设置,代码如下
// Seconds set the unit with seconds
func (j *Job) Seconds() *Job {
return j.setUnit("seconds")
}
// Minutes set the unit with minute
func (j *Job) Minutes() *Job {
return j.setUnit("minutes")
}
// Second set the unit with second
func (j *Job) Second() *Job {
j.mustInterval(1)
return j.Seconds()
}
// Minute set the unit with minute, which interval is 1
func (j *Job) Minute() *Job {
j.mustInterval(1)
return j.Minutes()
}
// Sunday sets the job start day Sunday
func (j *Job) Sunday() *Job {
return j.Weekday(time.Sunday)
}
// Every schedule a new periodic job with interval
func (s *Scheduler) Every(interval uint64) *Job {
job := NewJob(interval)
s.jobs[s.size] = job
s.size++
return job
}
简单总结
gocron代码总共570行,在java中但凡涉及到一点“通用工具”或“架构”的实现,除了多依赖之外,撸代码是少不了的。但在go中要实现满足基本功能的cron任务调度,没有多余依赖,纯基于gosdk本身,575行代码打完收工。这是在不接触go语言之前设想不到的事情。轻量好用,为中间件而生。
编程语言跟人类沟通语言一样,属性都是工具,透过这个工具无论作为人或是工程师,给我们打开的是另一个世界和景象。在对比中扬长避短,可对比的资源越多,越是能找到最优方案。不怕不知道,就怕不知道。