今天为大家介绍一个Go处理异步任务的解决方案:Asynq,是一个 Go 库,用于排队任务并与 worker 异步处理它们。它由Redis提供支持,旨在实现可扩展且易于上手。

一、概述

Asynq 是一个 Go 库,用于对任务进行排队并与工作人员异步处理它们。

  • Asynq 工作原理的高级概述:
    • 客户端将任务放入队列
    • 服务器从队列中拉出任务并为每个任务启动一个工作 goroutine
    • 多个工作人员同时处理任务
  • git库地址:

https://github.com/hibiken/asynq

二、快速开始

1. 准备工作

确保已安装并运行了redis

redis 版本大于5.0

redis-server

目录结构

.
├── conf
│   └── redis.conf
└── docker-compose.yml

docker-compose.yml

conf/redis.conf

redis

docker-compose up -d

2. 安装asynq软件包

go get -u github.com/hibiken/asynq

3. 创建项目asynq_task

目录结构:

.
|-- README.md
|-- cmd
|   `-- main.go  # 启动消费者监听
|-- go.mod
|-- go.sum
|-- test.go # 生产者 发送测试数据
`-- test_delivery
    |-- client 
    |   `-- client.go # 生产者 具体发送测试数据的逻辑
    `-- test_delivery.go  # 消费者,执行任务具体处理逻辑

Redis连接项

AsynqRedis
client.gomain.goRedis
asynq.RedisClientOptRedis

4. Task任务

*asynq.Task

5. 编写程序

1)test_delivery.go 一个封装任务创建和任务处理的包

client.go

Client

main.go 异步任务服务入口文件

ServerHandlerServeMuxnet/httpHandler

4)test.go 用来分发异步任务

6. 运行查看结果

首先,我们要先把异步任务启动起来准备好接收,也就是启动cmd/main.go

启动test.go文件向异步任务服务添加任务队列

结果如下:

go run main.go
go run test.go

三、细节

1. 关于asynq的优雅退出

如果异步服务突然被暂停,正在执行的异步任务会push到队列中,下次启动的时候自动执行。

我们可以将一个异步任务中途sleep几秒,发送一个异步任务,任务没执行完中途停掉任务测试出结果:

再次启动异步任务服务,发现这个任务被重新执行。

2. client中 client.Enqueue 的使用

立即处理任务

client.Enqueue(t1, time.Now())

2)延时处理任务, 两小时后处理

client.Enqueue(t2, asynq.ProcessIn(time.Now().Add(2 * time.Hour)))

任务重试,最大重试次数为25次。

client.Enqueue(task, asynq.MaxRetry(5))

4)确保任务的唯一性

4-1:使用TaskID选项:自行生成唯一的任务 ID

4-2:使用Unique选项:让 Asynq 为任务创建唯一性锁

asynqAsynqmonAsynqWebUIPrometheus

启动服务:

docker-compose up

访问:

http://192.168.0.120:8980/

总结