分布式任务调度框架基本能力:

  1. 任务管理能力(增删改查、执行、定时执行、延时执行、健康监控)
  2. 集群管理能力(扩展简单、效率高)
  3. 编程能力(运行代码)
  4. Web界面管理
Elastic-JobXXL-JobQuartzJavaGo
gocrongocraft/workrobfig/cron
microservice orchestration platform

截屏2021-09-18 13.41.19.png

ActivityWorkflowWorkersTemporal ServerCLIWebSDK
角色说明
Activity一段可运行代码(function),可以包含任何逻辑。由于Temporal提供了各种语言的SDK(go、java、python、php等等)所以Activity是不限制语言的。
WorkflowActivity的集合,多个Activity可以构成一个Workflow,也是调度的最小单位。
Workers不同语言写的Workflow可以注册到对应语言的Worker中,Worker是代码的真正执行者
Temporal管理注册到自己的Workers,向Workers下发任务,监听任务状态等等
CLI or SDK任务的发起者、监控任务进度等
Web负责任务的监控、查询等。
CLI or SDKTemporalWorkers
    1. 更关注任务的执行
        消息队列的核心是消息,生产者不关心这个消息被谁消费,消息队列关注的是消息是否送达。
        而 Temporal ,关注的是任务,关注任务执行进度和结果,是否需要重启等等。
    2. 需要提前设计好Workflow
        消息队列不需要关注消费者怎么消费消息的。而 Temporal ,你必须先把Workflow的逻辑
        写好。
Temporal

Temporal 示例

Talk is cheap, show me the code.

Temporal
Activity
type MessageRequest struct {
   PhoneNum string
   Content  string
   Tags     []string
}

func SendMessage(ctx context.Context, mr MessageRequest) (mresp MessageResponse, error) {
   fmt.Printf(
      "\nSending message to %s \n Content is %s \n ",
      mr.PhoneNum,
      mr.Content,
   )
   return nil, nil
}

type EmailRequest struct{
    From string 
    To string 
    Content stirng 
}

func SendEmail(ctx context.Context, er EmailRequest) error{
       fmt.Printf(
      "\nSending email to %s \n Content is %s \n ",
      er.To,
      er.Content,
   )
   return nil, nil
}

ActivityActivity
2. Workflow 的代码
func SendMessageWorkflow(ctx workflow.Context, msq MessageRequest, er EmailRequest) error {
   options := workflow.ActivityOptions{
      StartToCloseTimeout: time.Minute,
   }
   ctx = workflow.WithActivityOptions(ctx, options)
   
   // 设计工作流
   // 1. 先执行SendMessage 活动
   err := workflow.ExecuteActivity(ctx, SendMessage, msq).Get(ctx, nil)
   if err != nil {
      return err
   }
   // 2. 再执行 SendEmail 活动
   err = workflow.ExecuteActivity(ctx, SendEmail, msq).Get(ctx, nil)
   if err != nil {
      return err
   }
   return nil
}
WorkflowActivityWorkflowWorkflowActivity
3. 启动 Workers
func main() {
    // 连接到 Temporal Server,注册自己
   c, err := client.NewClient(client.Options{})
   if err != nil {
      log.Fatalln("unable to create Temporal client", err)
   }
   defer c.Close()
   
   w := worker.New(c, app.TaskQueue, worker.Options{})
   w.RegisterWorkflow(app.SendMessageWorkflow)
   w.RegisterActivity(app.SendMessage)
   w.RegisterActivity(app.SendEmail)
   
   // Start listening to the Task Queue
   err = w.Run(worker.InterruptCh())
   if err != nil {
      log.Fatalln("unable to start Worker", err)
   }
}
WorkerWorkflowActivityWorkerWorkerWorkerWorker
4. 启动 TemplateServer

按照官网的文档启动即可。

5. 发起任务
func main() {
   // 先连上 TemplateServer
   c, err := client.NewClient(client.Options{})
   if err != nil {
      log.Fatalln("unable to create Temporal client", err)
   }
   defer c.Close()
   
   options := client.StartWorkflowOptions{
      TaskQueue: app.TaskQueue,
   }
   r1 := app.MessageRequest{
      ...
   }
   r2 := app.EmailRequest{
      ...
   }
   we, err := c.ExecuteWorkflow(context.Background(), options, "SendMessageWorkflow", transferDetails)
   if err != nil {
      log.Fatalln("error starting SendMessageWorkflow", err)
   }
   printResults(we.GetID(), we.GetRunID())
}
WorkflowTaskQueueWorkerWorkflow

Temporal 总结

Temporal
Go