导语

在Golang任务队列machinery使用与源码剖析(一)一文中,我们主要对golang中任务队列machinery的设计结构以及具体模块的功能与源码实现进行了详细介绍。在了解了基本工作原理之后,本篇系列之二我们将从使用的角度,同时结合源码继续对machinery进一步介绍。

配置方案

为了掌握如何使用machinery,本文将会同时从调用代码和machinery源码来进行详细介绍,同时将会在每份代码段的初始部分分别标识出。在我们对使用方法进行介绍之前,首先通过machinery的启动配置文件来一探是如何衔接起各个工作模块的。在machinery中,支持两种配置方式,分别是:

  • 基于yaml文件的配置
  • 基于环境变量的配置

基于配置文件

对于配置文件,machinery中支持的格式为yaml,下面是一个基本的machinery的配置文件:

# machinery.yaml
broker: 'redis://123456@localhost:6379'
#broker: 'amqp://guest:guest@localhost:5672/'
#broker: 'https://sqs.us-west-2.amazonaws.com/123456789012'

default_queue: 'machinery_tasks'

result_backend: 'redis://123456@localhost:6379'
#result_backend: 'memcache://localhost:11211'
#result_backend: 'mongodb://localhost:27017'

results_expire_in: 36000

amqp:
  binding_key: machinery_task
  exchange: machinery_exchange
  exchange_type: direct
  prefetch_count: 3

其中,

  • broker:broker的地址,可以根据实际使用的存储介质,分别指定Redis、AMQP或AWS SQS;
  • default_queue:broker默认存放任务的队列名称;
  • result_backend:backend配置,用来指定存放结果的介质的配置。可以根据需求,分别制定Redis、memchache或mongodb等;
  • results_expire_in:任务执行结果记录留存于backend保留时间,单位为秒;
  • amqp:为当我们使用AMQP是的详细配置信息;

上面展示的,是一个基本版的配置文件,而machinery中的所有配置,我们可以通过下方的数据结构来了解:

// 源码
// machinery完整的配置结构
type Config struct {
	Broker          string       `yaml:"broker" envconfig:"BROKER"`
	DefaultQueue    string       `yaml:"default_queue" envconfig:"DEFAULT_QUEUE"`
	ResultBackend   string       `yaml:"result_backend" envconfig:"RESULT_BACKEND"`
	ResultsExpireIn int          `yaml:"results_expire_in" envconfig:"RESULTS_EXPIRE_IN"`
	AMQP            *AMQPConfig  `yaml:"amqp"`
	SQS             *SQSConfig   `yaml:"sqs"`
	Redis           *RedisConfig `yaml:"redis"`
	TLSConfig       *tls.Config
	// NoUnixSignals - when set disables signal handling in machinery
	NoUnixSignals bool            `yaml:"no_unix_signals" envconfig:"NO_UNIX_SIGNALS"`
	DynamoDB      *DynamoDBConfig `yaml:"dynamodb"`
}

调用yaml文件的配置方式,通过NewFromYaml()接口来完成,NewFromYaml()的基本逻辑中,主要是实现了一个加载配置的函数和reload配置的逻辑,而默认reload间隔reloadDelay为10s:

// 源码
func NewFromYaml(cnfPath string, keepReloading bool) (*Config, error) {
   cnf, err := fromFile(cnfPath)
   if err != nil {
      return nil, err
   }
   if keepReloading {
      // 通过go程,实现定时reload配置
      go func() {
         for {
            // Delay after each request
            time.Sleep(reloadDelay)
            // Attempt to reload the config
            newCnf, newErr := fromFile(cnfPath)
            if newErr != nil {
               log.WARNING.Printf("Failed to reload config from file %s: %v", cnfPath, newErr)
               continue
            }
            *cnf = *newCnf
         }
      }()
   }
   return cnf, nil
}

基于环境变量

上面提到,machinery的配置文件方式只支持yaml,然而并不是所有项目都是采用了yaml文件,例如我们目前项目中配置文件普遍使用了toml文件。这样,为了使用machinery就必须在一个项目出现了两个配置文件(项目的toml文件和machinery的yaml文件)。然而,做技术的一般都是有强迫症的,两个配置文件存在于一个项目的确是一个十分ugly的现象,为了解决这一问题,machinery的环境变量配置模式的作用就体现出来了。

基于环境变量的源码与基于配置的结构类似,不在此列出。由于支持了基于环境变量的配置初始化,那么,我们可以将所有的配置均放在同一个配置文件中(对于我们的项目来说,即为toml文件),并在项目init阶段,将machinery的配置从toml文件中全部加载到临时环境变量并读取,从而曲线救国,解决了多配置文件的问题,简单示例如下。

// 调用代码
//首先将配置文件加载到内存appConfig结构中
// (具体加载到内存中的方法很多,每个项目都有自己的方式,略去)

//然后将相关配置加载到环境变量
os.Setenv("BROKER", brokerValue)
os.Setenv("REDIS_MAX_IDLE", strconv.Itoa(appConfig.GetInt("redis.maxidle"))) //从toml中读取参数
os.Setenv("REDIS_MAX_ACTIVE", strconv.Itoa(appConfig.GetInt("redis.maxactive")))
os.Setenv("REDIS_IDLE_TIMEOUT", strconv.Itoa(appConfig.GetInt("redis.idletimeout")))
os.Setenv("REDIS_READ_TIMEOUT", strconv.Itoa(appConfig.GetInt("redis.readtimeout")))
os.Setenv("REDIS_WRITE_TIMEOUT", strconv.Itoa(appConfig.GetInt("redis.writetimeout")))
os.Setenv("REDIS_CONNECT_TIMEOUT", strconv.Itoa(appConfig.GetInt("redis.connecttimeout")))

//通过NewFromEnvironment()函数加载环境变量
config.NewFromEnvironment(false)

任务

从框架设计到逻辑原理,说了这么久的任务队列,任务,才是所有任务队列中的最基础的元素。从代码层面来看,一个任务就是一个执行函数。结合上篇文章中介绍过的machinery架构,我们可以知道在 machinery中,一个典型的处理流程,即为:

  • 任务创建
  • 任务注册
  • 任务发布
  • 任务执行
  • 结果获取

在下面的篇幅中,我们分别就上述步骤,从使用方法和原理同时解析,来详细介绍在machinery中如何操作和实现这几个过程。

结构

在详细介绍machinery如何处理一次任务流程之前,我们需要首先知道,究竟在machinery中一个任务的数据结构是什么样子的。在machinery的源码世界里,我们可以理解将一个任务称作为Signature,其数据结构如下:

// 源码
type Signature struct {
        UUID           string
        Name           string
        RoutingKey     string
        ETA            *time.Time
        GroupUUID      string
        GroupTaskCount int
        Args           []Arg
        Headers        Headers
        Immutable      bool
        RetryCount     int
        RetryTimeout   int
        OnSuccess      []*Signature
        OnError        []*Signature
        ChordCallback  *Signature
}

我们介绍几个有意义的参数:

  • UUID,任务的unique ID,可以主动设定也可以由系统自行设定;
  • Name,任务的名称,用于识别任务;
  • RoutingKey,根据这个key,用于将任务扔到一个正确的队列中;
  • ETA,专用于延时任务,若该参数为nil,说明需要立即将该任务扔给worker,否则,在参数数值到来之前,该任务将一直delay;
  • GroupUUID和GroupTaskCount,用于workflow中的Group分组任务创建
  • Args,任务传递给worker时的参数列表
  • Headers,用于tracing
  • RetryCount和RetryTimeout,用于实现任务的重试机制
  • Immutable,该参数可以控制任务之间是否需要参数传递
  • OnSuccess和OnError,实现回调,workflow中的链式任务(chain),就是通过OnSuccess来实现链式调度
  • ChordCallback,workflow中的chord模式,在group内所有任务全部执行完成后,进行callback。

创建任务

任务的创建在machinery中十分简单,其实就是生成一个Signature的实例,我们看下面的示例代码中,我们创建了一个名为 “audience.DownloadFromCos”的任务,指定了该任务的重试次数为2,重试超时时间为3s,同时设定了两个参数到Args中。

// 业务代码
const(
    TASK_RETRY_COUNT = 2
    TASK_RETRY_TIMEOUT = 3
)

...

func buildDownloadFromCos(appId, bucketName string) tasks.Signature {
	return tasks.Signature{
		Name: "audience.DownloadFromCos",
		RetryCount: TASK_RETRY_COUNT,
		RetryTimeout: TASK_RETRY_TIMEOUT,
		Args: []tasks.Arg{
			{
				Type: "string",
				Value: appId,
			},
			{
				Type: "string",
				Value: bucketName,
			},
		},
	}
}

注册任务

当任务创建完毕后,我们需要将任务注册到broker中才可供worker识别以调用。machinery中提供了一个RegisterTasks()函数来接受任务的注册,下面的代码中,我们将上述的DownloadFromCos()和另一个ParseCosFile()任务分别以名称为“audience.DownloadFromCos”和“audience.ParseCosFile”注册到machinery中:

// 调用代码
// 注册任务
tasks := map[string]interface{}{
   "audience.DownloadFromCos": DownloadFromCos,
   "audience.ParseCosFile": ParseCosFile,
 }
server.RegisterTasks(tasks)

RegisterTasks()会将所有的tasks加载到成员变量registeredTasks(同样为一个map类型)中,同时将通过调用系列一文中提到的broker接口SetRegisteredTaskNames(),将tasks名称注册到broker中,从而在broker之后接收到待处理任务之后可以判断是否为合法已注册任务:

// 源码
func (server *Server) RegisterTasks(namedTaskFuncs map[string]interface{}) error {
	for _, task := range namedTaskFuncs {
		if err := tasks.ValidateTask(task); err != nil {
			return err
		}
	}
	server.registeredTasks = namedTaskFuncs
	
	server.broker.SetRegisteredTaskNames(server.GetRegisteredTaskNames())
	return nil
}

...

// SetRegisteredTaskNames函数
func (b *Broker) SetRegisteredTaskNames(names []string) {
	b.registeredTaskNames = names
}

发布任务

通过发布任务,broker中才会收到具体的任务内容。machinery中的任务提供了多种任务类型(主要是与Workflow相关,下文会详述),不同的任务有着类似但不相同的发布方法。在当前章节中,我们将首先介绍最基本的普通任务的发布:

// 业务代码
// 发送一个任务,taskObj由buildDownloadFromCos()返回
asyncResult, err := machinery.SendTask(&taskObj)
if err != nil {
   log.LoggerFromContextWithCaller(ctx).Errorf(err.Error())
   return
}

SendTask()中会调用Publish()函数去发布任务到broker,我们看到Publish()函数接受一个Signature类型的变量,首先将会根据任务参数ETA去判断任务类型是实时任务还是延时任务,从而扔到不同的任务队列中。一旦任务被扔到Broker中,worker就可以去获取并执行任务了。

// 源码
var redisDelayedTasksKey = "delayed_tasks"

func (b *RedisBroker) Publish(signature *tasks.Signature) error {
        ...
	// 根据ETA判断普通任务还是延时任务
  // ETA不为nil,延时任务,将ETA作为score到ZSET
	if signature.ETA != nil {
		now := time.Now().UTC()

		if signature.ETA.After(now) {
			score := signature.ETA.UnixNano()
                        
			_, err = conn.Do("ZADD", redisDelayedTasksKey, score, msg)
			return err
		}
	}

  // ETA为nil,普通任务,到LIST
	_, err = conn.Do("RPUSH", signature.RoutingKey, msg)
	return err
}

获取任务结果

// 业务代码
// asyncResult由上述发布任务函数machinery.SendTask()所返回
results, err := asyncResult.Get(time.Duration(time.Millisecond * 5))
if err != nil {
        log.LoggerWrapperWithCaller().Errorf(err.Error())
        return
}

其中,asyncResult.Get()函数为一个异步阻塞函数,根据设定的参数轮询从backends中获取任务执行结果。以redis作为backends时为例,每一个任务的执行结果会存储于一条redis的key-value中,所以Get()函数最终会调用GET方法,根据任务的UUID,去redis中获取结果:

// 源码
// 根据uuid,获取该任务最近的状态 
func (b *RedisBackend) GetState(taskUUID string) (*tasks.TaskState, error) {
	conn := b.open()
	defer conn.Close()

	item, err := redis.Bytes(conn.Do("GET", taskUUID))
	if err != nil {
		return nil, err
	}

	state := new(tasks.TaskState)
	decoder := json.NewDecoder(bytes.NewReader(item))
	decoder.UseNumber()
	if err := decoder.Decode(state); err != nil {
		return nil, err
	}

	return state, nil
}

通过上面的介绍,我们基本知道了在machinery中如何走一个最简单的任务操作及其原理。但是,如同我们在系列一中说到的,任务队列仅有这些功能是远远不够的,接下来我们将继续介绍machinery中的额外的任务队列功能。

任务重试机制

任务重试,算是任务队列的一个除了基本功能外的一个重要的基础功能,在上文中也已经顺带提到了。之所以会单独拎出来说,machinery中提供了两种方式来实现任务重试。

对于第一种,在上文中已经提到过,machinery中通过设置任务的RetryCount和RetryTimeout参数来实现,当任务执行出错后,会通过这两个参数来更新任务的ETA参数。RetryCount提供了重试次数,RetryTimeout提供了一个基于斐波那契数列的回退超时机制。

关于第二种,通过返回一个ErrRetryTaskLater类型的值来制定。

// 源码
// ErrRetryTaskLater ...
type ErrRetryTaskLater struct {
	name, msg string
	retryIn   time.Duration
}

同时需要注意的是,第二种方式的优先级高于第一种,也就是说,如果在新建任务的时候指定了RetryCount和RetryTimeout参数,但是在执行任务失败后反悔了ErrRetryTaskLater类型的数值,依然按照返回值里的参数来制定重试方式。

在系列之一的最后部分,我们提到了Worker处理中的Process()接口的实现,下面让我们再来看一下Process()实现中的任务重试部分的代码:

// 源码
// worker执行任务的函数
func (worker *Worker) Process(signature *tasks.Signature) error {
	
	...
	
	// 任务执行失败后,判定重试的逻辑
	results, err := task.Call()
	if err != nil {
		// If a tasks.ErrRetryTaskLater was returned from the task,
		// retry the task after specified duration
		retriableErr, ok := interface{}(err).(tasks.ErrRetryTaskLater)
		if ok {
			return worker.retryTaskIn(signature, retriableErr.RetryIn())
		}
		// Otherwise, execute default retry logic based on signature.RetryCount
		// and signature.RetryTimeout values
		if signature.RetryCount > 0 {
			return worker.taskRetry(signature)
		}
		return worker.taskFailed(signature, err)
	}
	 
	...

}

我们看到,当任务执行失败后,将会首先判断tasks.ErrRetryTaskLater是否有被实现,根据是否实现了该接口,分别调用retryTaskIn或者taskRetry函数。其中,

  • retryTaskIn()函数将任务参数ETA重新,并重新发布该任务;
  • retryTask()函数将任务参数RetryCount减1,同时根据RetryTimeout参数更新ETA,并重新发布该任务;
// 源码
func (worker *Worker) retryTaskIn(signature *tasks.Signature, retryIn time.Duration) error {
   // 更新任务状态为Retry
   if err := worker.server.GetBackend().SetStateRetry(signature); err != nil {
      return fmt.Errorf("Set state retry error: %s", err)
   }

   // 将参数ETA更新为当前时间+retryIn.Seconds()
   eta := time.Now().UTC().Add(retryIn)
   signature.ETA = &eta

   log.WARNING.Printf("Task %s failed. Going to retry in %.0f seconds.", signature.UUID, retryIn.Seconds())

   // 重新发布任务
   _, err := worker.server.SendTask(signature)
   return err
}
// 源码
func (worker *Worker) taskRetry(signature *tasks.Signature) error {
   // Update task state to RETRY
   if err := worker.server.GetBackend().SetStateRetry(signature); err != nil {
      return fmt.Errorf("Set state retry error: %s", err)
   }

   // RetryCount - 1
   signature.RetryCount--

   // 更新Retrytimeout时间
   signature.RetryTimeout = retry.FibonacciNext(signature.RetryTimeout)

   // 跟新ETA参数
   eta := time.Now().UTC().Add(time.Second * time.Duration(signature.RetryTimeout))
   signature.ETA = &eta

   log.WARNING.Printf("Task %s failed. Going to retry in %d seconds.", signature.UUID, signature.RetryTimeout)

   // 重新发布任务
   _, err := worker.server.SendTask(signature)
   return err
}

WorkFlow模式

运行一个任务,支持实时执行、或者延时执行,同时在出错后支持重试等功能,这些在部分场合已经十分适用了,但是,在更多场景下,我们需要执行的任务之间有上下依赖需要串行执行、或者任务之间完全并行不相关、又或者根据结果成功失败来执行回调的需求。为了满足这类需求,machinery中Workflow模式的作用体现出来了。

Chain链式任务

所谓任务链式调度,即一系列任务之间采用one by one的串行调度,只有前一个任务执行完毕,才会执行后一个任务。我们依然根据上文介绍普通任务的流程来介绍在machinery中对于链式任务的处理流程:

  • 链式任务创建

machinery中提供了NewChain()接口来实现链式任务的创建,如下所示,我们分别创建了两个任务实例task0和task1,然后通过NewChain()生成了链式任务chain实例:

// 调用代码
task0 := buildDownloadFromCos("audience.DownloadFromCos",appId, bucketName)
task1 := buildDoCompare("audience.DoCompare",appId, upstreamId)
chain, err := tasks.NewChain(&chain)
if err != nil {
    log.LoggerWrapperWithCaller().Errorf(err.Error())
    return
}
// 源码
func NewChain(signatures ...*Signature) (*Chain, error) {
   // uuid的生成逻辑
   for _, signature := range signatures {
      if signature.UUID == "" {

         signatureID, err := uuid.NewV4()

         if err != nil {
            return nil, fmt.Errorf("Error generating signature id: %s", err.Error())
         }

         signature.UUID = fmt.Sprintf("task_%v", signatureID)
      }
   }

   // 将所有任务拼装,并传递给Chain数据结构中的Tasks变量
   for i := len(signatures) - 1; i > 0; i-- {
      if i > 0 {
         signatures[i-1].OnSuccess = []*Signature{signatures[i]}
      }
   }

   chain := &Chain{Tasks: signatures}

   return chain, nil
}

...

// 链式任务数据结构
type Chain struct {
	Tasks []*Signature
}

我们重点看一下上述的任务拼装部分,通过链式任务的拼装,最终我们通过chain任务中的第一个任务,就可以看到chain中的所有任务,下方是一个chain任务存储于Broker中的示例,该chain任务中,一共有2个基本任务task,分别通过OnSuccess来实现任务的连接调用:

// Chain任务在Broker中的示例
{
    "UUID": "task_14a0f22c-b456-493c-9c9c-7cd2216d4339", 
    "Name": "audience.ExtractEsToFile", 
    "RoutingKey": "machinery_tasks", 
    "ETA": null, 
    "GroupUUID": "", 
    "GroupTaskCount": 0, 
    "Args": [
        {
            "Name": "", 
            "Type": "string", 
            "Value": "123456789"
        }, 
        {
            "Name": "", 
            "Type": "string", 
            "Value": "audience/2018-06-07/4703668260587260.data"
        }, 
        {
            "Name": "", 
            "Type": "string", 
            "Value": "[{"field":"gender.keyword","value":["女"],"operation":"and"},{"field":"job_title.keyword","value":["快?~R佑~X"],"operation":"and"}} ,{"field":"school_role.keyword","value":["维修工"],"operation":"and"}]"
        }
    ], 
    "Headers": null, 
    "Immutable": false, 
    "RetryCount": 3, 
    "RetryTimmeout": 5, 
    "OnSuccess": [
        {
            "UUID": "task_cab90b2c-2dc5-4035-91f6-4803df67c8c0", 
            "Name": "audience.EncryptExtractFile", 
            "RoutingKey": "", 
            "ETA": null, 
            "GroupUUID": "", 
            "GroupTaskCount": 0, 
            "Args": [
                {
                    "Name": "", 
                    "Type": "string", 
                    "Value": "123456789"
                }, 
                {
                    "Name": "", 
                    "Type": "int64", 
                    "Value": 29
                }
            ], 
            "Headers": null, 
            "Immutable": false, 
            "RetryCount": 3, 
            "RetryTimeout": 5, 
            "OnSuccess": [
                {
                    "UUID": "task_82f4f831-af24-4f71-950b-5d11abac3dea", 
                    "Name": "audience.UploadExtractFileToCos", 
                    "RoutingKey": "", 
                    "ETA": null, 
                    "GroupUUID": "", 
                    "GroupTaskCount": 0, 
                    "Args": [
                        {
                            "Name": "", 
                            "Type": "string", 
                            "Value": "123456789"
                        }, 
                        {
                            "Name": "", 
                            "Type": "string", 
                            "Value": "beta-abc"
                        }, 
                        {
                            "Name": "", 
                            "Type": "string", 
                            "Value": "ap-shanghai"
                        }, 
                        {
                            "Name": "", 
                            "Type": "string", 
                            "Value": "audience/2018-06-07/4703668260587260.data"
                        }
                    ], 
                    "Headers": null, 
                    "Immutable": false, 
                    "RetryCount": 3, 
                    "RetryTimeout": 5, 
                    "OnSuccess": null, 
                    "OnError": null, 
                    "ChordCallback": null
                }
            ], 
            "OnError": null, 
            "ChordCallback": null
        }
    ], 
    "OnError": null, 
    "ChordCallback": null
}
  • 链式任务发布

链式任务发布的方法与普通任务接口设计基本一致,直接通过调用SendChain()接口即可:

// 调用代码
asyncResult, err := mc.SendChain(&chain)
if err != nil {
 log.LoggerFromContextWithCaller(ctx).Errorf(err.Error())
 return
}

结合上述对chain任务创建的介绍,我们来看下SendChain()代码实现。如上文描述,由于第一个任务中已经包含了所有的后续任务信息,发布一个chain任务,实际上只需要发布第一个任务到Broker中即可:

// 源码
func (server *Server) SendChain(chain *tasks.Chain) (*backends.ChainAsyncResult, error) {
   _, err := server.SendTask(chain.Tasks[0])
   if err != nil {
      return nil, err
   }

   return backends.NewChainAsyncResult(chain.Tasks, server.backend), nil
}

因此,真正实现的chain中后续的任务调用,是由worker对每一个任务的OnSuccess参数检查来实现调度的,当worker中每一个任务调用成功后,都会触发taskSucceeded()执行,taskSucceeded中则会针对OnSuccess进行判断,如果发现存在OnSuccess,则会发布下一个任务,从而实现了任务的链式调度:

// 源码
func (worker *Worker) taskSucceeded(signature *tasks.Signature, taskResults []*tasks.TaskResult) error {
  
   ... 

   for _, successTask := range signature.OnSuccess {
      if signature.Immutable == false {
         // Pass results of the task to success callbacks
         for _, taskResult := range taskResults {
            successTask.Args = append(successTask.Args, tasks.Arg{
               Type:  taskResult.Type,
               Value: taskResult.Value,
            })
         }
      }

      worker.server.SendTask(successTask)
   }
   
   ...

}
  • 链式任务结果获取

链式任务的结果获取,获取的是最后一个任务的执行结果,调用方式与普通任务获取无异,方式如下:

// 调用代码
results, err := chainAsyncResult.Get(time.Duration(time.Millisecond * 5))
if err != nil {
log.LoggerWrapperWithCaller().Errorf(err.Error())
return
}
  • 链式任务参数传递

这时候,细心的同学可能会问了,如果串行执行的任务之间有参数依赖需要传递的话,如何实现呢?这时候我们回头看一下Signature的参数中有一个Immutable,chain任务会根据每一个任务的Immutable参数的数值来决定是否帮我们将上一个任务的返回值通过参数传递给chain中的下一个任务,如果immutable为true,那么参数将不会被传递下去,实现该部分的代码在任务执行成功的taskSucceeded()函数中。我们会看上一小节的taskSucceeded()代码,就可以看到对每一个任务的Immutable检查。

Group分组任务

分组任务,听起来可能不太理解,但如果换成另一个名词:并行任务,就很好理解了。即一个分组group内的所有任务,是相互独立,同时执行的,这在任务队列中也是有着相当大的使用场景的。

  • 分组任务创建

machinery中创建分组任务的接口是NewGroup(),可以接受多个普通任务参数,并生成一个group实例:

// 调用代码
group, err := tasks.NewGroup(&task0, &task1, &task2)
if err != nil {
    log.LoggerWrapperWithCaller().Errorf(err.Error())
}

在Group中的所有任务,没有执行顺序的要求,所有的任务将会由所有worker去竞争获取并执行。也是因此,创建分组任务的实现也十分简单,只需要进行简单的复制即可:

// 源码
func NewGroup(signatures ...*Signature) (*Group, error) {
   // 创建Group的uuid
   groupUUID, err := uuid.NewV4()
   
   ...

   for _, signature := range signatures {
      if signature.UUID == "" {

         signatureID, err := uuid.NewV4()

         if err != nil {
            return nil, fmt.Errorf("Error generating signature id: %s", err.Error())
         }

         signature.UUID = fmt.Sprintf("task_%v", signatureID)
      }
      signature.GroupUUID = groupID
      signature.GroupTaskCount = len(signatures)
   }

   return &Group{
      GroupUUID: groupID,
      Tasks:     signatures,
   }, nil
}

...

// 分组任务数据结构
type Group struct {
	GroupUUID string
	Tasks     []*Signature
}
  • 分组任务发布

machinery中发布分组任务接口是SendGroup(),支持将任务并行发布到Broker中,同时还支持了一个发送任务的并发控制,以防止同一时刻发布太多任务到任务队列,照顾到Broker的性能。

asyncResults, err := server.SendGroup(group, /*并发任务数量*/5)
if err != nil {
   return fmt.Errorf("Could not send group: %s", err.Error())
}

SendGroup()接口的代码如下,不必要的地方已经省略,同时添加了部分注释:

// 源码
func (server *Server) SendGroup(group *tasks.Group, sendConcurrency int) ([]*backends.AsyncResult, error) {
   
   ...
   
   wg.Add(len(group.Tasks))
   errorsChan := make(chan error, len(group.Tasks)*2)

   // 初始化group,主要负责在backend中根据group uuid创建一条记录,以存储该group任务的状态
   server.backend.InitGroup(group.GroupUUID, group.GetUUIDs())

   // 任务状态设置
   for _, signature := range group.Tasks {
      if err := server.backend.SetStatePending(signature); err != nil {
         errorsChan <- err
         continue
      }
   }

   // 并发控制
   pool := make(chan struct{}, sendConcurrency)
   go func() {
      for i := 0; i < sendConcurrency; i++ {
         pool <- struct{}{}
      }
   }()

   for i, signature := range group.Tasks {
      if sendConcurrency > 0 {
         <-pool
      }

      go func(s *tasks.Signature, index int) {
         defer wg.Done()
         // 发布任务
         err := server.broker.Publish(s)

         if sendConcurrency > 0 {
            pool <- struct{}{}
         }

         if err != nil {
            errorsChan <- fmt.Errorf("Publish message error: %s", err)
            return
         }

         asyncResults[index] = backends.NewAsyncResult(s, server.backend)
      }(signature, i)
   }

   ...
 
}
  • 分组任务结果获取

分组任务中每个任务的执行结果,全部存储于asyncResult 中,可通过遍历获取:

	// 调用代码
	for _, asyncResult := range asyncResults {
		results, err = asyncResult.Get(time.Duration(time.Millisecond * 5))
		if err != nil {
			return fmt.Errorf("Getting task result failed with error: %s", err.Error())
		}
		log.INFO.Printf(
			"%v + %v = %vn",
			asyncResult.Signature.Args[0].Value,
			asyncResult.Signature.Args[1].Value,
			tasks.HumanReadableResults(results),
		)
	}

Chord任务

chord任务,其功能是分组任务+回调任务。即,chord任务允许我们在并行执行完毕所有一组group任务之后,回调一个callback任务,这也是一个有着非常多的应用场景。

  • chord任务创建

machinery通过NewChord()接口实现chord任务创建,接受两个参数,分别是group任务实例和回调函数:

// 调用代码
chord, err := tasks.NewChord(group, &cbTask)
if err != nil {
   return fmt.Errorf("Error creating chord: %s", err)
}

而NewChord()函数主要负责给group中每个任务的ChordCallback赋值为需要回调的函数:

// 源码
func NewChord(group *Group, callback *Signature) (*Chord, error) {
   
   ...

   for _, signature := range group.Tasks {
      signature.ChordCallback = callback
   }
   return &Chord{Group: group, Callback: callback}, nil
}

...

// Chord任务数据结构
type Chord struct {
	Group    *Group
	Callback *Signature
}
  • chord任务发布

chord任务发布比较也比较简单,接口SendChord()与SendGroup()类似,而SendChord()中的代码逻辑也比较简单,即简单的warp了一下SendGroup任务。

// 调用代码
chordAsyncResult, err := server.SendChord(chord, /*并发任务数量*/5)
if err != nil {
   return fmt.Errorf("Could not send chord: %s", err.Error())
}
// 源码
func (server *Server) SendChord(chord *tasks.Chord, sendConcurrency int) (*backends.ChordAsyncResult, error) {
   _, err := server.SendGroup(chord.Group, sendConcurrency)
   if err != nil {
      return nil, err
   }

   return backends.NewChordAsyncResult(
      chord.Group.Tasks,
      chord.Callback,
      server.backend,
   ), nil
}

而主要对chord的回调函数的调用,则同样是通过任务执行成功后的执行函数taskSucceeded()来实现,其相关代码如下,具体的逻辑可以从添加的注释中了解:

// 源码
func (worker *Worker) taskSucceeded(signature *tasks.Signature, taskResults []*tasks.TaskResult) error {
   
   ... 
   
   // 若该任务不是groupe任务中的, return
	if signature.GroupUUID == "" {
		return nil
	}

	// 检查是否group中的所有任务都执行完毕
	groupCompleted, err := worker.server.GetBackend().GroupCompleted(
		signature.GroupUUID,
		signature.GroupTaskCount,
	)
	if err != nil {
		return fmt.Errorf("Group completed error: %s", err)
	}
	if !groupCompleted {
		return nil
	}

   ...

   // 若无chord callback, return
   if signature.ChordCallback == nil {
      return nil
   }

   // 触发chord callback, 确保只触发一次
   shouldTrigger, err := worker.server.GetBackend().TriggerChord(signature.GroupUUID)
   if err != nil {
      return fmt.Errorf("Trigger chord error: %s", err)
   }
   if !shouldTrigger {
      return nil
   }

   ...

   // 发送chord任务
   _, err = worker.server.SendTask(signature.ChordCallback)
   if err != nil {
      return err
   }

   return nil
}
  • chord任务结果获取

chord任务的结果获取与其他的操作无异,如下所示:

// 调用代码
results, err = chordAsyncResult.Get(time.Duration(time.Millisecond * 5))
if err != nil {
   return fmt.Errorf("Getting chord result failed with error: %s", err.Error())
}

总结

通过两个系列的篇幅,我们详细的介绍了golang中任务队列machinery的使用和原理,从而为众多golang使用者们推荐了一款好用的任务队列,并从源码层解析了其详细实现。

同时,本文主要是基于Redis作为存储介质来进行详细介绍,而Redis中缺乏如Ack之类的机制,尽管可以通过LUA脚本简介实现,但是在云时代的Redis集群对EVAL接口支持的能力还有不足,使得使用起来还是略有欠缺。因此,对于任务可靠性要求更高的,可以使用基于AMQP的方案来使用,更多关于AMQP的实现接口,基本与Redis下一致,大家可以进一步阅读源码。