Go异步任务 (machinery篇)

machinery源码解读

 broker: 'redis://localhost:6379'
    #broker: 'redis://localhost:6379'
    #broker: 'https://sqs.us-west-2.amazonaws.com/123456789012'
    default_queue: machinery_tasks

    result_backend: 'redis://localhost:6379'
    #result_backend: 'memcache://localhost:11211'
    #result_backend: 'mongodb://localhost:27017'
    results_expire_in: 3600000
    
    amqp:
      binding_key: machinery_task
      exchange: machinery_exchange
      exchange_type: direct
      prefetch_count: 3

复制代码

2. broker perpare task 准备并发条件chan

  // StartConsuming enters a loop and waits for incoming messages
func (b *Broker) StartConsuming(consumerTag string, concurrency int, taskProcessor iface.TaskProcessor) (bool, error) {
	b.Broker.StartConsuming(consumerTag, concurrency, taskProcessor)//公共方法,初始化b.retryFunc b.stopChan b.retryStopChan 

	b.pool = nil
	conn := b.open()
	defer conn.Close()
	defer b.pool.Close()

	// Ping the server to make sure connection is live
	_, err := conn.Do("PING")
	if err != nil {
		b.GetRetryFunc()(b.GetRetryStopChan())
		return b.GetRetry(), err
	}

	// Channels and wait groups used to properly close down goroutines
	b.stopReceivingChan = make(chan int)
	b.stopDelayedChan = make(chan int)
	b.receivingWG.Add(1) //WG waitGroup
	b.delayedWG.Add(1)

	// Channel to which we will push tasks ready for processing by worker
	//待消费任务chan
	deliveries := make(chan []byte)
	//并发量
	pool := make(chan struct{}, concurrency) 

	// initialize worker pool with maxWorkers workers
	go func() {
		for i := 0; i < concurrency; i++ {
			pool <- struct{}{}
		}
	}()

	// Helper function to return true if parallel task processing slots still available, 
	// false when we are already executing maximum allowed concurrent tasks
	//如果并行通道是可用的而且没有达到最大并发 就返回true
	var concurrencyAvailable = func() bool {
		return concurrency == 0 || (len(pool)-len(deliveries) > 0)
	}

	// Timer is added otherwise when the pools were all active it will spin the for loop
	var (
		timerDuration = time.Duration(100000000 * time.Nanosecond) // 100 miliseconds
		timer         = time.NewTimer(0)
	)

复制代码

3. worker 正常消费list 队列中的任务 并将任务放入deliveries chan 等待消费

// A receivig goroutine keeps popping messages from the queue by BLPOP
	// If the message is valid and can be unmarshaled into a proper structure
	// we send it to the deliveries channel
	go func() {
		defer b.receivingWG.Done()

		log.INFO.Print("[*] Waiting for messages. To exit press CTRL+C")

		for {
			select {
			// A way to stop this goroutine from b.StopConsuming
			case <-b.stopReceivingChan:
				return
			case <-timer.C:
				// If concurrency is limited, limit the tasks being pulled off the queue
				// until a pool is available
				if concurrencyAvailable() {
					task, err := b.nextTask(getQueue(b.GetConfig(), taskProcessor))
					if err != nil {
						// something went wrong, wait a bit before continuing the loop
						timer.Reset(timerDuration)
						continue
					}

					deliveries <- task
				}
				if concurrencyAvailable() {
					// parallel task processing slots still available, continue loop immediately
					timer.Reset(0)
				} else {
					// using all parallel task processing slots, wait a bit before continuing the loop
					timer.Reset(timerDuration)
				}
			}
		}
	}()


复制代码

题外话:关于如何获取nextTask:利用redis list数据结构 blpop

 // nextTask pops next available task from the default queue 
    // worker.Queue 定义了 队列名称 指定worker 消费对应队列
func (b *Broker) nextTask(queue string) (result []byte, err error) {
	conn := b.open()
	defer conn.Close()
//可以参考redis 中的lpop & Blpop之间的区别:BLPOP 是列表的阻塞式(blocking)弹出原语。
	items, err := redis.ByteSlices(conn.Do("BLPOP", queue, 1))
	if err != nil {
		return []byte{}, err
	}

	// items[0] - the name of the key where an element was popped
	// items[1] - the value of the popped element
	if len(items) != 2 {
		return []byte{}, redis.ErrNil
	}

	result = items[1]

	return result, nil
}

复制代码

4. 延迟任务的获取:所有的延迟任务都在一个zset中,每次原子性的获取最近的延迟任务

	// A goroutine to watch for delayed tasks and push them to deliveries
	// channel for consumption by the worker
	   go func() {
		  defer b.delayedWG.Done()

		  for {
			 select {
			 // A way to stop this goroutine from        b.StopConsuming
			 case <-b.stopDelayedChan:
				return
			 default:
				task, err := b.nextDelayedTask(redisDelayedTasksKey)
				if err != nil {
					continue
				}

				signature := new(tasks.Signature)
				decoder := json.NewDecoder(bytes.NewReader(task))
				decoder.UseNumber()
				if err := decoder.Decode(signature); err != nil {
					log.ERROR.Print(errs.NewErrCouldNotUnmarshaTaskSignature(task, err))
				}

				if err := b.Publish(signature); err != nil {
					log.ERROR.Print(err)
				}
			}
		}
	   }()
复制代码

题外话:b.nextDelayedTask 如何获取延迟队列:将zset中的延迟任务重新push到对应的list 使用 watch 实现 zpop 原子性的zrem

    WATCH zset
    element = ZRANGE zset 0 0
    MULTI
    ZREM zset element
    EXEC
复制代码

5. 真正消费deliveries中的tasks(这篇文章的下段会讲到消费),前期都是对task的准备

     if err := b.consume(deliveries, pool, concurrency,     taskProcessor); err != nil {
		  return b.GetRetry(), err
	   }

	   // Waiting for any tasks being processed to finish
	   b.processingWG.Wait()

	   return b.GetRetry(), nil
复制代码

题外话(思考):从准备消费队列的过程中 学到了什么

  1. 如何通过 redis WATCH MULTI EXEC 实现ZPOP
  2. 设置receivingWG waitgroup 的用处 (看代码):为了保证在 stopConsuming 的时候push deliver into chan 真正关闭
// StopConsuming quits the loop
func (b *RedisBroker) StopConsuming() {
	   // Stop the receiving goroutine
	   b.stopReceivingChan <- 1 
	   // Waiting for the receiving goroutine to have  stopped 真正关闭wg的chan
	   b.receivingWG.Wait()

	   // Stop the delayed tasks goroutine
	   b.stopDelayedChan <- 1
	   // Waiting for the delayed tasks goroutine to have stopped
	   b.delayedWG.Wait()

	   b.stopConsuming()

	   // Waiting for any tasks being processed to finish
	   b.processingWG.Wait()
}
复制代码

#####以上的操作都是broker 准备队列数据 接下来 我们会讲到 task 从开始到结束状态 会经历什么

  1. task be send : send task的时候会记录 task status 用redis kv 记录task的status 从初始状态: PENDING -> RECEIVED -> STARTED -> RETRY -> SUCCESS/FAILURE. send task 返回一个异步结果result.AsyncResult
    // SendTask publishes a task to the default queue
func (server *Server) SendTask(signature *tasks.Signature) (*result.AsyncResult, error) {
	   // Make sure result backend is defined
	   if server.backend == nil {
		  return nil, errors.New("Result backend required")
	}

	   // Auto generate a UUID if not set already
	   if signature.UUID == "" {
		  taskID := uuid.New().String()
		  signature.UUID = fmt.Sprintf("task_%v", taskID)
	}

	   // Set initial task state to PENDING
	   if err := server.backend.SetStatePending(signature); err != nil {
		  return nil, fmt.Errorf("Set state pending error: %s", err)
	}

	   if err := server.broker.Publish(signature); err != nil {
		  return nil, fmt.Errorf("Publish message error: %s", err)
	}

	   return result.NewAsyncResult(signature, server.backend), nil
}
复制代码

题外话:在setRetry的时候用了 斐波那契:

    package retry
    // Fibonacci returns successive Fibonacci numbers starting from 1
func Fibonacci() func() int {
	a, b := 0, 1
	return func() int {
		a, b = b, a+b
		return a
	}
}
// FibonacciNext returns next number in Fibonacci sequence greater than start
func FibonacciNext(start int) int {
	fib := Fibonacci()
	num := fib()
	for num <= start {
		num = fib()
	}
	return num
}
    
复制代码

2. tasks be publish :

    // Publish places a new message on the default queue
func (b *Broker) Publish(signature *tasks.Signature) error {
	// Adjust routing key (this decides which queue the message will be published to)
	b.Broker.AdjustRoutingKey(signature)

	msg, err := json.Marshal(signature)
	if err != nil {
		return fmt.Errorf("JSON marshal error: %s", err)
	}

	conn := b.open()
	defer conn.Close()

	// Check the ETA signature field, if it is set and it is in the future,
	// delay the task
	if signature.ETA != nil {
		now := time.Now().UTC()

		if signature.ETA.After(now) {
			score := signature.ETA.UnixNano()
			_, err = conn.Do("ZADD", redisDelayedTasksKey, score, msg)
			return err
		}
	}

	_, err = conn.Do("RPUSH", signature.RoutingKey, msg)
	return err
}
复制代码

3.tasks be consum: 查看任务是否被注册了 - 设置任务的状态为 收到 - 序列化 任务 - 设置状态为 开始并执行任务 - 失败重试(里面有斐波那契 retry) - 得到 result - 设置 suc callback

// Process handles received tasks and triggers success/error callbacks
func (worker *Worker) Process(signature *tasks.Signature) error {
	// If the task is not registered with this worker, do not continue
	// but only return nil as we do not want to restart the worker process
	if !worker.server.IsTaskRegistered(signature.Name) {
		return nil
	}

	taskFunc, err := worker.server.GetRegisteredTask(signature.Name)
	if err != nil {
		return nil
	}

	// Update task state to RECEIVED
	if err = worker.server.GetBackend().SetStateReceived(signature); err != nil {
		return fmt.Errorf("Set state received error: %s", err)
	}

	// Prepare task for processing
	task, err := tasks.New(taskFunc, signature.Args)
	// if this failed, it means the task is malformed, probably has invalid
	// signature, go directly to task failed without checking whether to retry
	if err != nil {
		worker.taskFailed(signature, err)
		return err
	}

	// try to extract trace span from headers and add it to the function context
	// so it can be used inside the function if it has context.Context as the first
	// argument. Start a new span if it isn't found.
	taskSpan := tracing.StartSpanFromHeaders(signature.Headers, signature.Name)
	tracing.AnnotateSpanWithSignatureInfo(taskSpan, signature)
	task.Context = opentracing.ContextWithSpan(task.Context, taskSpan)

	// Update task state to STARTED
	if err = worker.server.GetBackend().SetStateStarted(signature); err != nil {
		return fmt.Errorf("Set state started error: %s", err)
	}

	// Call the task
	results, err := task.Call()
	if err != nil {
		// If a tasks.ErrRetryTaskLater was returned from the task,
		// retry the task after specified duration
		retriableErr, ok := interface{}(err).(tasks.ErrRetryTaskLater)
		if ok {
			return worker.retryTaskIn(signature, retriableErr.RetryIn())
		}

		// Otherwise, execute default retry logic based on signature.RetryCount
		// and signature.RetryTimeout values
		if signature.RetryCount > 0 {
			return worker.taskRetry(signature)
		}

		return worker.taskFailed(signature, err)
	}

	return worker.taskSucceeded(signature, results)
}
复制代码

转载注明出处:www.xiaoweiluo.cf