Go异步任务 (machinery篇)
machinery源码解读
broker: 'redis://localhost:6379'
#broker: 'redis://localhost:6379'
#broker: 'https://sqs.us-west-2.amazonaws.com/123456789012'
default_queue: machinery_tasks
result_backend: 'redis://localhost:6379'
#result_backend: 'memcache://localhost:11211'
#result_backend: 'mongodb://localhost:27017'
results_expire_in: 3600000
amqp:
binding_key: machinery_task
exchange: machinery_exchange
exchange_type: direct
prefetch_count: 3
复制代码
2. broker perpare task 准备并发条件chan
// StartConsuming enters a loop and waits for incoming messages
func (b *Broker) StartConsuming(consumerTag string, concurrency int, taskProcessor iface.TaskProcessor) (bool, error) {
b.Broker.StartConsuming(consumerTag, concurrency, taskProcessor)//公共方法,初始化b.retryFunc b.stopChan b.retryStopChan
b.pool = nil
conn := b.open()
defer conn.Close()
defer b.pool.Close()
// Ping the server to make sure connection is live
_, err := conn.Do("PING")
if err != nil {
b.GetRetryFunc()(b.GetRetryStopChan())
return b.GetRetry(), err
}
// Channels and wait groups used to properly close down goroutines
b.stopReceivingChan = make(chan int)
b.stopDelayedChan = make(chan int)
b.receivingWG.Add(1) //WG waitGroup
b.delayedWG.Add(1)
// Channel to which we will push tasks ready for processing by worker
//待消费任务chan
deliveries := make(chan []byte)
//并发量
pool := make(chan struct{}, concurrency)
// initialize worker pool with maxWorkers workers
go func() {
for i := 0; i < concurrency; i++ {
pool <- struct{}{}
}
}()
// Helper function to return true if parallel task processing slots still available,
// false when we are already executing maximum allowed concurrent tasks
//如果并行通道是可用的而且没有达到最大并发 就返回true
var concurrencyAvailable = func() bool {
return concurrency == 0 || (len(pool)-len(deliveries) > 0)
}
// Timer is added otherwise when the pools were all active it will spin the for loop
var (
timerDuration = time.Duration(100000000 * time.Nanosecond) // 100 miliseconds
timer = time.NewTimer(0)
)
复制代码
3. worker 正常消费list 队列中的任务 并将任务放入deliveries chan 等待消费
// A receivig goroutine keeps popping messages from the queue by BLPOP
// If the message is valid and can be unmarshaled into a proper structure
// we send it to the deliveries channel
go func() {
defer b.receivingWG.Done()
log.INFO.Print("[*] Waiting for messages. To exit press CTRL+C")
for {
select {
// A way to stop this goroutine from b.StopConsuming
case <-b.stopReceivingChan:
return
case <-timer.C:
// If concurrency is limited, limit the tasks being pulled off the queue
// until a pool is available
if concurrencyAvailable() {
task, err := b.nextTask(getQueue(b.GetConfig(), taskProcessor))
if err != nil {
// something went wrong, wait a bit before continuing the loop
timer.Reset(timerDuration)
continue
}
deliveries <- task
}
if concurrencyAvailable() {
// parallel task processing slots still available, continue loop immediately
timer.Reset(0)
} else {
// using all parallel task processing slots, wait a bit before continuing the loop
timer.Reset(timerDuration)
}
}
}
}()
复制代码
题外话:关于如何获取nextTask:利用redis list数据结构 blpop
// nextTask pops next available task from the default queue
// worker.Queue 定义了 队列名称 指定worker 消费对应队列
func (b *Broker) nextTask(queue string) (result []byte, err error) {
conn := b.open()
defer conn.Close()
//可以参考redis 中的lpop & Blpop之间的区别:BLPOP 是列表的阻塞式(blocking)弹出原语。
items, err := redis.ByteSlices(conn.Do("BLPOP", queue, 1))
if err != nil {
return []byte{}, err
}
// items[0] - the name of the key where an element was popped
// items[1] - the value of the popped element
if len(items) != 2 {
return []byte{}, redis.ErrNil
}
result = items[1]
return result, nil
}
复制代码
4. 延迟任务的获取:所有的延迟任务都在一个zset中,每次原子性的获取最近的延迟任务
// A goroutine to watch for delayed tasks and push them to deliveries
// channel for consumption by the worker
go func() {
defer b.delayedWG.Done()
for {
select {
// A way to stop this goroutine from b.StopConsuming
case <-b.stopDelayedChan:
return
default:
task, err := b.nextDelayedTask(redisDelayedTasksKey)
if err != nil {
continue
}
signature := new(tasks.Signature)
decoder := json.NewDecoder(bytes.NewReader(task))
decoder.UseNumber()
if err := decoder.Decode(signature); err != nil {
log.ERROR.Print(errs.NewErrCouldNotUnmarshaTaskSignature(task, err))
}
if err := b.Publish(signature); err != nil {
log.ERROR.Print(err)
}
}
}
}()
复制代码
题外话:b.nextDelayedTask 如何获取延迟队列:将zset中的延迟任务重新push到对应的list 使用 watch 实现 zpop 原子性的zrem
WATCH zset
element = ZRANGE zset 0 0
MULTI
ZREM zset element
EXEC
复制代码
5. 真正消费deliveries中的tasks(这篇文章的下段会讲到消费),前期都是对task的准备
if err := b.consume(deliveries, pool, concurrency, taskProcessor); err != nil {
return b.GetRetry(), err
}
// Waiting for any tasks being processed to finish
b.processingWG.Wait()
return b.GetRetry(), nil
复制代码
题外话(思考):从准备消费队列的过程中 学到了什么
- 如何通过 redis WATCH MULTI EXEC 实现ZPOP
- 设置receivingWG waitgroup 的用处 (看代码):为了保证在 stopConsuming 的时候push deliver into chan 真正关闭
// StopConsuming quits the loop
func (b *RedisBroker) StopConsuming() {
// Stop the receiving goroutine
b.stopReceivingChan <- 1
// Waiting for the receiving goroutine to have stopped 真正关闭wg的chan
b.receivingWG.Wait()
// Stop the delayed tasks goroutine
b.stopDelayedChan <- 1
// Waiting for the delayed tasks goroutine to have stopped
b.delayedWG.Wait()
b.stopConsuming()
// Waiting for any tasks being processed to finish
b.processingWG.Wait()
}
复制代码
#####以上的操作都是broker 准备队列数据 接下来 我们会讲到 task 从开始到结束状态 会经历什么
- task be send : send task的时候会记录 task status 用redis kv 记录task的status 从初始状态: PENDING -> RECEIVED -> STARTED -> RETRY -> SUCCESS/FAILURE. send task 返回一个异步结果result.AsyncResult
// SendTask publishes a task to the default queue
func (server *Server) SendTask(signature *tasks.Signature) (*result.AsyncResult, error) {
// Make sure result backend is defined
if server.backend == nil {
return nil, errors.New("Result backend required")
}
// Auto generate a UUID if not set already
if signature.UUID == "" {
taskID := uuid.New().String()
signature.UUID = fmt.Sprintf("task_%v", taskID)
}
// Set initial task state to PENDING
if err := server.backend.SetStatePending(signature); err != nil {
return nil, fmt.Errorf("Set state pending error: %s", err)
}
if err := server.broker.Publish(signature); err != nil {
return nil, fmt.Errorf("Publish message error: %s", err)
}
return result.NewAsyncResult(signature, server.backend), nil
}
复制代码
题外话:在setRetry的时候用了 斐波那契:
package retry
// Fibonacci returns successive Fibonacci numbers starting from 1
func Fibonacci() func() int {
a, b := 0, 1
return func() int {
a, b = b, a+b
return a
}
}
// FibonacciNext returns next number in Fibonacci sequence greater than start
func FibonacciNext(start int) int {
fib := Fibonacci()
num := fib()
for num <= start {
num = fib()
}
return num
}
复制代码
2. tasks be publish :
// Publish places a new message on the default queue
func (b *Broker) Publish(signature *tasks.Signature) error {
// Adjust routing key (this decides which queue the message will be published to)
b.Broker.AdjustRoutingKey(signature)
msg, err := json.Marshal(signature)
if err != nil {
return fmt.Errorf("JSON marshal error: %s", err)
}
conn := b.open()
defer conn.Close()
// Check the ETA signature field, if it is set and it is in the future,
// delay the task
if signature.ETA != nil {
now := time.Now().UTC()
if signature.ETA.After(now) {
score := signature.ETA.UnixNano()
_, err = conn.Do("ZADD", redisDelayedTasksKey, score, msg)
return err
}
}
_, err = conn.Do("RPUSH", signature.RoutingKey, msg)
return err
}
复制代码
3.tasks be consum: 查看任务是否被注册了 - 设置任务的状态为 收到 - 序列化 任务 - 设置状态为 开始并执行任务 - 失败重试(里面有斐波那契 retry) - 得到 result - 设置 suc callback
// Process handles received tasks and triggers success/error callbacks
func (worker *Worker) Process(signature *tasks.Signature) error {
// If the task is not registered with this worker, do not continue
// but only return nil as we do not want to restart the worker process
if !worker.server.IsTaskRegistered(signature.Name) {
return nil
}
taskFunc, err := worker.server.GetRegisteredTask(signature.Name)
if err != nil {
return nil
}
// Update task state to RECEIVED
if err = worker.server.GetBackend().SetStateReceived(signature); err != nil {
return fmt.Errorf("Set state received error: %s", err)
}
// Prepare task for processing
task, err := tasks.New(taskFunc, signature.Args)
// if this failed, it means the task is malformed, probably has invalid
// signature, go directly to task failed without checking whether to retry
if err != nil {
worker.taskFailed(signature, err)
return err
}
// try to extract trace span from headers and add it to the function context
// so it can be used inside the function if it has context.Context as the first
// argument. Start a new span if it isn't found.
taskSpan := tracing.StartSpanFromHeaders(signature.Headers, signature.Name)
tracing.AnnotateSpanWithSignatureInfo(taskSpan, signature)
task.Context = opentracing.ContextWithSpan(task.Context, taskSpan)
// Update task state to STARTED
if err = worker.server.GetBackend().SetStateStarted(signature); err != nil {
return fmt.Errorf("Set state started error: %s", err)
}
// Call the task
results, err := task.Call()
if err != nil {
// If a tasks.ErrRetryTaskLater was returned from the task,
// retry the task after specified duration
retriableErr, ok := interface{}(err).(tasks.ErrRetryTaskLater)
if ok {
return worker.retryTaskIn(signature, retriableErr.RetryIn())
}
// Otherwise, execute default retry logic based on signature.RetryCount
// and signature.RetryTimeout values
if signature.RetryCount > 0 {
return worker.taskRetry(signature)
}
return worker.taskFailed(signature, err)
}
return worker.taskSucceeded(signature, results)
}
复制代码
转载注明出处:www.xiaoweiluo.cf