因为业务需要用到异步任务队列,在python里面可以用celery了,在异步任务方面很方便,可是go里面没有celery,找了一圈发现machinery符合预期。官方github。在网上想找一点可以参考的demo或者分享的文章很少,所以自己就研究了一下,分享一下。用过celery就知道celery也有worker,这里machinery也一样的有worker,还有send。顾名思义一个是往队列加任务的,一个是消费任务的。我这里主要是把send封装为对外提供接口,通过接口提交异步任务队列。这个任务一般是长耗时的io或者计算密集型的任务。而且这个任务的结果不用马上拿到,而是后期可以通过redis存储了任务结果和状态。然后发任务的sender可以异步获取结果。废话不多说,翠花上代码。。。。。。。
目录结构:
.
├── app.go #主程序入口
├── config #app配置文件
│ ├── conf.go
│ └── settings.toml
├── config.yml #machinery配置文件
├── gredis #redis存储任务结果进度
│ └── gredis.go
├── handle #路由处理
│ ├── handle_router.go
├── tasks #任务处理
│ └── tasks.go
└── util
└── tool.go
tasks/tasks.go
这个文件里面主要是任务处理的function,sender请求发送的任务最后是调用这些函数
package itasks
import (
"errors"
"fmt"
"machinery/gredis"
"time"
"github.com/go-redis/redis"
)
// Add ...
func Add(args ...int64) (int64, error) {
sum := int64(0)
for _, arg := range args {
sum += arg
}
return sum, nil
}
// LongRunningTask ...
func LongRunningTask() error {
log.INFO.Print("Long running task started")
for i := 0; i < 100; i++ {
log.INFO.Print(10 - i)
time.Sleep(1 * time.Second)
//这里面可以通过redis更新任务进度
}
//这里可以用redis取存储任务结果,sender可以异步轮询或者指定多久后取redis取结果
log.INFO.Print("Long running task finished")
return nil
}
handle/handle_router.go
路由处理函数,路由收到请求会通过这里调用路由函数发送任务给worker处理,worker最后会调用tasks里面的函数
package handlerouter
import (
"fmt"
"machinery/gredis"
"net/http"
"strings"
"github.com/RichardKnop/machinery/v1"
"github.com/RichardKnop/machinery/v1/tasks"
"github.com/gin-gonic/gin"
"github.com/google/uuid"
)
//Add test task method add
func Add(c *gin.Context, s *machinery.Server) {
var (
uid = uuid.New().String()
)
signature := &tasks.Signature{
UUID: uid,
Name: "add",
Args: []tasks.Arg{
{
Type: "int64",
Value: 1,
},
{
Type: "int64",
Value: 1,
},
},
}
asyncResult, err := s.SendTask(signature)
if err != nil {
panic(err.Error())
}
c.JSON(200, gin.H{"add": err, "uuid": uid})
fmt.Println(asyncResult)
}
//Add test task method longRunningTask
func LongRunningTask(c *gin.Context, s *machinery.Server) {
var (
uid = uuid.New().String()
)
signature := &tasks.Signature{
Name: "long_running_task",
}
asyncResult, err := s.SendTask(signature)
if err != nil {
panic(err.Error())
}
c.JSON(200, gin.H{"longRunningTask": err, "uuid": uid})
fmt.Println(asyncResult)
}
config/settings.toml
配置文件
config_path = "./config.yml"
app_port = 18578 # app对外的的端口
task_redis_host = "localhost"
task_redis_port = 6379
task_redis_password = "123456"
task_redis_db = 0
task_redis_pool_size = 20 #redis线程池大小
config/conf.go
初始化配置
package conf
import (
"log"
"os"
"github.com/BurntSushi/toml"
)
//BaseConfig BaseConfig
type BaseConfig struct {
ConfigPath string `toml:"config_path"`
AppPort int64 `toml:"app_port"`
TaskRedisHost string `toml:"task_redis_host"`
TaskRedisPort int `toml:"task_redis_port"`
TaskRedisPassword string `toml:"task_redis_password"`
TaskRedisDB int `toml:"task_redis_db"`
TaskRedisPoolSize int `toml:"task_redis_pool_size"`
}
//Cfg 全局配置
var Cfg = &BaseConfig{}
//InitConfig 初始化配置
func InitConfig() {
path, pErr := os.Getwd()
if pErr != nil {
log.Panic(pErr)
}
path += "/config/settings.toml"
if _, err := toml.DecodeFile(path, &Cfg); err != nil {
log.Panic(err)
}
}
config.yml
machinery配置文件
broker: 'amqp://guest:guest@localhost:5672/'
default_queue: machinery_tasks
result_backend: 'redis://123456@localhost:6379'
results_expire_in: 3600000
amqp:
binding_key: machinery_task
exchange: machinery_exchange
exchange_type: direct
prefetch_count: 3
gredis/gredis.go
redis初始化可以用于更新任务进度或存储任务结果
package gredis
import (
"fmt"
"log"
conf "machinery/config"
"github.com/go-redis/redis"
)
//Client Global client
var Client *redis.Client
//InitRedisClient Establish a connection pool
func InitRedisClient() {
Client = redis.NewClient(&redis.Options{
Addr: fmt.Sprintf("%s:%d", conf.Cfg.TaskRedisHost, conf.Cfg.TaskRedisPort),
Password: conf.Cfg.TaskRedisPassword,
DB: conf.Cfg.TaskRedisDB,
PoolSize: conf.Cfg.TaskRedisPoolSize,
})
}
app.go
主程序
package main
import (
"fmt"
"machinery/gredis"
handlerouter "machinery/handle"
itasks "machinery/tasks"
"os"
conf "machinery/config"
"github.com/RichardKnop/machinery/v1"
"github.com/RichardKnop/machinery/v1/config"
"github.com/gin-gonic/gin"
"github.com/urfave/cli"
)
var (
server *machinery.Server
cnf *config.Config
app *cli.App
tasks map[string]interface{}
)
func init() {
//初始化配置
config.InitConfig()
//命令行标签主要是命令行中用不通的标签运行worker或者sender
var err error
app = cli.NewApp()
app.Flags = []cli.Flag{
cli.StringFlag{
Name: "c",
Value: "",
Usage: "Path to a configuration file",
},
}
//
tasks = map[string]interface{}{
"add": itasks.Add,
"long_running_task": itasks.LongRunningTask,
}
if cnf, err = loadConfig(conf.Cfg.ConfigPath); err != nil {
panic(err)
}
if server, err = machinery.NewServer(cnf); err != nil {
panic(err)
}
gredis.InitRedisClient()
}
func main() {
// Set the CLI app commands
app.Commands = []cli.Command{
{
Name: "worker",
Usage: "launch machinery worker",
Action: func(c *cli.Context) error {
if err := runWorker(); err != nil {
return cli.NewExitError(err.Error(), 1)
}
return nil
},
},
{
Name: "send",
Usage: "send async tasks ",
Action: func(c *cli.Context) error {
if err := runSender(); err != nil {
return cli.NewExitError(err.Error(), 1)
}
return nil
},
},
}
// Run the CLI app
app.Run(os.Args)
}
func loadConfig(configPath string) (*config.Config, error) {
if configPath != "" {
return config.NewFromYaml(configPath, true)
}
return config.NewFromEnvironment(true)
}
func runWorker() (err error) {
server.RegisterTasks(tasks)
if err != nil {
panic(err)
return
}
workers := server.NewWorker("worker_test", 10)
err = workers.Launch()
if err != nil {
panic(err)
return
}
return
}
// sender对外提供接口的
func runSender() (err error) {
r := gin.Default()
// Ping test
r.GET("/ping", func(c *gin.Context) {
c.String(200, "pong")
})
r.GET("/add", func(c *gin.Context) {
handlerouter.Add(c, server)
})
r.POST("/longRunningTask", func(c *gin.Context) {
handlerouter.LongRunningTask(c, server)
})
err = r.Run(fmt.Sprintf(":%d", conf.Cfg.AppPort))
return
}
//meachinery 实例初始化
func startServer() (err error) {
// Create server instance
server, err = machinery.NewServer(cnf)
if err != nil {
return
}
// 注册任务
err = server.RegisterTasks(tasks)
return
}
运行方法 cd 到项目目录,
运行workre:go run app.go worker
运行sender:go run app.go send
请求测试:
add测试: curl http://localhost:18578/add
longRunningTask测试: curl http://localhost:18578/longRunningTask