golang基于machinery扩展支持定时任务管理和任务广播的分布式任务调度系统
package main
import (
"fmt"
"gitee.com/sqxwww/xmachinery"
"github.com/RichardKnop/machinery/v2"
redisbackend "github.com/RichardKnop/machinery/v2/backends/redis"
redisbroker "github.com/RichardKnop/machinery/v2/brokers/redis"
"github.com/RichardKnop/machinery/v2/config"
eagerlock "github.com/RichardKnop/machinery/v2/locks/eager"
"github.com/RichardKnop/machinery/v2/tasks"
)
var server *xmachinery.XServer
func init() {
server, _ = startServer()
}
func main() {
server.RegisterScheduledTask(&xmachinery.ScheduledTask{
Id: "countDoneScheduler",
TaskCode: "countDown",
Spec: "0/2 * * * * ?",
Args: []tasks.Arg{{Type: "int", Value: 5}},
})
worker()
}
func countDown(count int) error {
if count <= 0 {
fmt.Println("removing countDoneScheduler")
//移除定时任务
server.RemoveScheduledTask("countDoneScheduler")
return nil
}
fmt.Println("current count is ", count)
count--
//替换定时任务
server.RegisterScheduledTask(&xmachinery.ScheduledTask{
Id: "countDoneScheduler",
TaskCode: "countDown",
Spec: "0/2 * * * * ?",
Args: []tasks.Arg{{Type: "int", Value: count}},
})
return nil
}
func startServer() (*xmachinery.XServer, error) {
cnf := &config.Config{
DefaultQueue: "machinery_tasks",
ResultsExpireIn: 3600,
Redis: &config.RedisConfig{
MaxIdle: 3,
IdleTimeout: 240,
ReadTimeout: 15,
WriteTimeout: 15,
ConnectTimeout: 15,
NormalTasksPollPeriod: 1000,
DelayedTasksPollPeriod: 500,
},
}
broker := redisbroker.New(cnf, "localhost:6379", "", "", 0)
backend := redisbackend.New(cnf, "localhost:6379", "", "", 0)
lock := eagerlock.New()
tmp := machinery.NewServer(cnf, broker, backend, lock)
server := xmachinery.NewServer(tmp)
tasks := map[string]interface{}{
"countDown": countDown,
}
return server, server.RegisterTasks(tasks)
}
func worker() error {
consumerTag := "machinery_worker"
worker := server.NewWorker(consumerTag, 0)
return worker.Launch()
}