taskq 是一个带有 Redis、SQS、IronMQ 和内存后端的 Golang 异步任务/作业队列。

特性:

  • Redis、SQS、IronMQ 和内存后端。
  • 自动缩放用于获取(fetcher)和处理消息(worker)的 goroutines 的数量。
  • 全局速率限制。
  • 工人的全球限制。
  • 调用一次 - 删除具有相同名称的重复消息。
  • 使用指数退避自动重试。
  • 当队列中的所有消息都失败时自动暂停。
  • 用于处理失败消息的后备处理程序。
  • 消息批处理。它用于 SQS 和 IronMQ 后端批量添加/删除消息。
  • 使用 snappy / s2 自动压缩消息。

API overview

 t := myQueue.RegisterTask(&taskq.TaskOptions{ Name: "greeting", Handler: func(name string) error { fmt.Println("Hello", name) return nil }, }) // Say "Hello World". err := myQueue.Add(t.WithArgs(context.Background(), "World")) if err != nil { panic(err) } // Say "Hello World" with 1 hour delay. msg := t.WithArgs(ctx, "World") msg.Delay = time.Hour _ = myQueue.Add(msg) // Say "Hello World" once. for i := 0; i < 100; i++ { msg := t.WithArgs(ctx, "World") msg.Name = "hello-world" // unique _ = myQueue.Add(msg) } // Say "Hello World" once with 1 hour delay. for i := 0; i < 100; i++ { msg := t.WithArgs(ctx, "World") msg.Name = "hello-world" msg.Delay = time.Hour _ = myQueue.Add(msg) } // Say "Hello World" once in an hour. for i := 0; i < 100; i++ { msg := t.WithArgs(ctx, "World").OnceInPeriod(time.Hour) _ = myQueue.Add(msg) } // Say "Hello World" for Europe region once in an hour. for i := 0; i < 100; i++ { msg := t.WithArgs(ctx, "World").OnceInPeriod(time.Hour, "World", "europe") _ = myQueue.Add(msg) }