package main import ( "fmt" "gitee.com/sqxwww/xmachinery" "github.com/RichardKnop/machinery/v2" redisbackend "github.com/RichardKnop/machinery/v2/backends/redis" redisbroker "github.com/RichardKnop/machinery/v2/brokers/redis" "github.com/RichardKnop/machinery/v2/config" eagerlock "github.com/RichardKnop/machinery/v2/locks/eager" "github.com/RichardKnop/machinery/v2/tasks" ) var server *xmachinery.XServer func init() { server, _ = startServer() } func main() { server.RegisterScheduledTask(&xmachinery.ScheduledTask{ Id: "countDoneScheduler", TaskCode: "countDown", Spec: "0/2 * * * * ?", Args: []tasks.Arg{{Type: "int", Value: 5}}, }) worker() } func countDown(count int) error { if count <= 0 { fmt.Println("removing countDoneScheduler") //移除定时任务 server.RemoveScheduledTask("countDoneScheduler") return nil } fmt.Println("current count is ", count) count-- //替换定时任务 server.RegisterScheduledTask(&xmachinery.ScheduledTask{ Id: "countDoneScheduler", TaskCode: "countDown", Spec: "0/2 * * * * ?", Args: []tasks.Arg{{Type: "int", Value: count}}, }) return nil } func startServer() (*xmachinery.XServer, error) { cnf := &config.Config{ DefaultQueue: "machinery_tasks", ResultsExpireIn: 3600, Redis: &config.RedisConfig{ MaxIdle: 3, IdleTimeout: 240, ReadTimeout: 15, WriteTimeout: 15, ConnectTimeout: 15, NormalTasksPollPeriod: 1000, DelayedTasksPollPeriod: 500, }, } broker := redisbroker.New(cnf, "localhost:6379", "", "", 0) backend := redisbackend.New(cnf, "localhost:6379", "", "", 0) lock := eagerlock.New() tmp := machinery.NewServer(cnf, broker, backend, lock) server := xmachinery.NewServer(tmp) tasks := map[string]interface{}{ "countDown": countDown, } return server, server.RegisterTasks(tasks) } func worker() error { consumerTag := "machinery_worker" worker := server.NewWorker(consumerTag, 0) return worker.Launch() }