咱们用一个系列来解说从需要到上线、从代码到k8s部署、从日志到监控等各个方面的微服务残缺实际。
整个我的项目应用了go-zero开发的微服务,根本蕴含了go-zero以及相干go-zero作者开发的一些中间件,所用到的技术栈根本是go-zero项目组的自研组件,根本是go-zero全家桶了。
实战我的项目地址:https://github.com/Mikaelemmm…
1、概述
音讯队列有很多种,有rabbitmq、rocketmq、kafka等罕用的,其中go-queue(https://github.com/zeromicro/…)是go-zero官网开发的音讯队列组件,其中分为2类,一种是kq、一种是dq,kq是基于kafka的音讯队列,dq是基于beanstalkd的提早队列,然而go-queue不反对定时工作。具体想更多理解go-queue的我之前也写过一篇教程能够去看一下这里不细说了。
本我的项目采纳的是go-queue做音讯队列,asynq做提早队列、定时队列
为什么应用asynq的几个起因
- 间接基于redis,个别我的项目都有redis,而asynq自身就是基于redis所以能够少保护一个中间件
- 反对音讯队列、提早队列、定时任务调度 , 因为心愿我的项目反对定时工作而asynq间接就反对
- 有webui界面,每个工作都能够暂停、归档、通过ui界面查看成功失败、监控
为什么asynq反对音讯队列还在应用go-queue?
- kafka的吞吐是业绩闻名的,如果后期量不大能够间接用asynq
- 没啥目标,就是想给你们演示一下go-queue
在咱们应用go-zero的时候,goctl给咱们带了很大的便当,然而目前go-zero只有生成api、rpc,很多同学在群里问定时工作、提早队列、音讯队列如何生成,目录构造该怎么做,其实go-zero是为咱们设计好了的,就是serviceGroup,应用serviceGroup治理你的服务。
2、如何应用
在后面订单、音讯等场景咱们其实曾经演示过了,这里再额定独自补充一次
咱们还是拿order-mq来举例子,显然应用goctl生成api、rpc不是咱们想要的,那咱们就本人应用serviceGroup革新,目录构造还是连续api的根本差不多,只是将handler改成了listen , 将logic换成了mqs。
2.1 在main中代码如下
var configFile = flag.String("f", "etc/order.yaml", "Specify the config file")
func main() {
flag.Parse()
var c config.Config
conf.MustLoad(*configFile, &c)
// log, prometheus, trace, metricsUrl
if err := c.SetUp(); err != nil {
panic(err)
}
serviceGroup := service.NewServiceGroup()
defer serviceGroup.Stop()
for _, mq := range listen.Mqs(c) {
serviceGroup.Add(mq)
}
serviceGroup.Start()
}
// Service is the interface that groups Start and Stop methods.
Service interface {
Starter // Start
Stopper // Stop
}
2.2 mq分类管理
go-zero-looklook/app/order/cmd/mq/internal/listen目录下代码
该目录下代码是对立治理不同类型mq,因为咱们要治理kq、asynq可能后续还有rabbitmq、rocketmq等等,所以在这里做了分类不便保护
对立治理在go-zero-looklook/app/order/cmd/mq/internal/listen/listen.go,而后在main中调用listen.Mqs能够获取所有mq一起start
// 返回所有消费者
func Mqs(c config.Config) []service.Service {
svcContext := svc.NewServiceContext(c)
ctx := context.Background()
var services []service.Service
// kq :音讯队列.
services = append(services, KqMqs(c, ctx, svcContext)...)
// asynq:提早队列、定时工作
services = append(services, AsynqMqs(c, ctx, svcContext)...)
// other mq ....
return services
}
go-zero-looklook/app/order/cmd/mq/internal/listen/asynqMqs.go就是定义的asynq
// asynq
// 定时工作、提早工作
func AsynqMqs(c config.Config, ctx context.Context, svcContext *svc.ServiceContext) []service.Service {
return []service.Service{
// 监听提早队列
deferMq.NewAsynqTask(ctx, svcContext),
// 监听定时工作
}
}
go-zero-looklook/app/order/cmd/mq/internal/listen/asynqMqs.go就是定义的kq (go-queue的kafka)
// kq
// 音讯队列
func KqMqs(c config.Config, ctx context.Context, svcContext *svc.ServiceContext) []service.Service {
return []service.Service{
// 监听生产流水状态变更
kq.MustNewQueue(c.PaymentUpdateStatusConf, kqMq.NewPaymentUpdateStatusMq(ctx, svcContext)),
// .....
}
}
2.3 理论业务
编写理论业务,咱们就在go-zero-looklook/app/order/cmd/mq/internal/listen/mqs下,这里为了不便保护,也是做了分类
- deferMq : 提早队列
- kq:音讯队列
2.3.1 提早队列
// 监听敞开订单
type AsynqTask struct {
ctx context.Context
svcCtx *svc.ServiceContext
}
func NewAsynqTask(ctx context.Context, svcCtx *svc.ServiceContext) *AsynqTask {
return &AsynqTask{
ctx: ctx,
svcCtx: svcCtx,
}
}
func (l *AsynqTask) Start() {
fmt.Println("AsynqTask start ")
srv := asynq.NewServer(
asynq.RedisClientOpt{Addr: l.svcCtx.Config.Redis.Host, Password: l.svcCtx.Config.Redis.Pass},
asynq.Config{
Concurrency: 10,
Queues: map[string]int{
"critical": 6,
"default": 3,
"low": 1,
},
},
)
mux := asynq.NewServeMux()
// 敞开民宿订单工作
mux.HandleFunc(asynqmq.TypeHomestayOrderCloseDelivery, l.closeHomestayOrderStateMqHandler)
if err := srv.Run(mux); err != nil {
log.Fatalf("could not run server: %v", err)
}
}
func (l *AsynqTask) Stop() {
fmt.Println("AsynqTask stop")
}
因为 asynq 要先启动,而后定义路由工作,所以咱们在asynqTask.go中做了对立的路由治理,之后咱们每个业务都独自的在deferMq的文件夹上面定义一个文件(如“提早敞开订单:closeHomestayOrderState.go”),这样每个业务一个文件,跟go-zero的api、rpc的logic一样,保护很不便
closeHomestayOrderState.go 敞开订单逻辑
package deferMq
import (
"context"
"encoding/json"
"looklook/app/order/cmd/rpc/order"
"looklook/app/order/model"
"looklook/common/asynqmq"
"looklook/common/xerr"
"github.com/hibiken/asynq"
"github.com/pkg/errors"
)
func (l *AsynqTask) closeHomestayOrderStateMqHandler(ctx context.Context, t *asynq.Task) error {
var p asynqmq.HomestayOrderCloseTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return errors.Wrapf(xerr.NewErrMsg("解析asynq task payload err"), "closeHomestayOrderStateMqHandler payload err:%v, payLoad:%+v", err, t.Payload())
}
resp, err := l.svcCtx.OrderRpc.HomestayOrderDetail(ctx, &order.HomestayOrderDetailReq{
Sn: p.Sn,
})
if err != nil || resp.HomestayOrder == nil {
return errors.Wrapf(xerr.NewErrMsg("获取订单失败"), "closeHomestayOrderStateMqHandler 获取订单失败 or 订单不存在 err:%v, sn:%s ,HomestayOrder : %+v", err, p.Sn, resp.HomestayOrder)
}
if resp.HomestayOrder.TradeState == model.HomestayOrderTradeStateWaitPay {
_, err := l.svcCtx.OrderRpc.UpdateHomestayOrderTradeState(ctx, &order.UpdateHomestayOrderTradeStateReq{
Sn: p.Sn,
TradeState: model.HomestayOrderTradeStateCancel,
})
if err != nil {
return errors.Wrapf(xerr.NewErrMsg("敞开订单失败"), "closeHomestayOrderStateMqHandler 敞开订单失败 err:%v, sn:%s ", err, p.Sn)
}
}
return nil
}
2.3.2 kq音讯队列
看go-zero-looklook/app/order/cmd/mq/internal/mqs/kq文件夹下,因为kq跟asynq不太一样,它自身就是应用go-zero的Service治理的,曾经实现了starter、stopper接口了,所以咱们在/Users/seven/Developer/goenv/go-zero-looklook/app/order/cmd/mq/internal/listen/kqMqs.go中间接定义好一个go-queue业务扔给serviceGroup,去交给main启动就好了 , 咱们的业务代码只须要实现go-queue的Consumer间接写咱们本人业务即可。
1)/Users/seven/Developer/goenv/go-zero-looklook/app/order/cmd/mq/internal/listen/kqMqs.go
func KqMqs(c config.Config, ctx context.Context, svcContext *svc.ServiceContext) []service.Service {
return []service.Service{
// 监听生产流水状态变更
kq.MustNewQueue(c.PaymentUpdateStatusConf, kqMq.NewPaymentUpdateStatusMq(ctx, svcContext)),
// .....
}
}
能够看到kq.MustNewQueue自身返回就是 queue.MessageQueue , queue.MessageQueue又实现了Start、Stop
2)业务中
/Users/seven/Developer/goenv/go-zero-looklook/app/order/cmd/mq/internal/mqs/kq/paymentUpdateStatus.go
func (l *PaymentUpdateStatusMq) Consume(_, val string) error {
fmt.Printf(" PaymentUpdateStatusMq Consume val : %s \n", val)
// 解析数据
var message kqueue.ThirdPaymentUpdatePayStatusNotifyMessage
if err := json.Unmarshal([]byte(val), &message); err != nil {
logx.WithContext(l.ctx).Error("PaymentUpdateStatusMq->Consume Unmarshal err : %v , val : %s", err, val)
return err
}
// 执行业务..
if err := l.execService(message); err != nil {
logx.WithContext(l.ctx).Error("PaymentUpdateStatusMq->execService err : %v , val : %s , message:%+v", err, val, message)
return err
}
return nil
}
咱们在paymentUpdateStatus.go中只须要实现接口Consume 就能够承受来自kq传过来的kafka的音讯了,咱们只管在咱们Consumer中解决咱们业务即可
3、定时工作
对于定时工作,目前go-zero-looklook没有应用,这里我也阐明一下
- 如果你想简略一点间接应用cron(裸机、k8s都有),
- 如果略微简单一点能够应用https://github.com/robfig/cron包,在代码中定义工夫
- 应用 xxl-job、gocron 分布式定时工作零碎接入
- asynq 的 shedule
这里因为我的项目用的asynq,我就演示一下asynq的shedule吧
分为client与server , client用来定义调度工夫,server是到了工夫承受client的音讯触发来执行咱们写的业务的,理论业务咱们应该写在server,client用来定义业务调度工夫的
asynqtest/docker-compose.yml
version: '3'
services:
#asynqmon asynq提早队列、定时队列的webui
asynqmon:
image: hibiken/asynqmon:latest
container_name: asynqmon_asynq
ports:
- 8980:8080
command:
- '--redis-addr=redis:6379'
- '--redis-password=G62m50oigInC30sf'
restart: always
networks:
- asynqtest_net
depends_on:
- redis
#redis容器
redis:
image: redis:6.2.5
container_name: redis_asynq
ports:
- 63779:6379
environment:
# 时区上海
TZ: Asia/Shanghai
volumes:
# 数据文件
- ./data/redis/data:/data:rw
command: "redis-server --requirepass G62m50oigInC30sf --appendonly yes"
privileged: true
restart: always
networks:
- asynqtest_net
networks:
asynqtest_net:
driver: bridge
ipam:
config:
- subnet: 172.22.0.0/16
asynqtest/shedule/client/client.go
package main
import (
"asynqtest/tpl"
"encoding/json"
"log"
"github.com/hibiken/asynq"
)
const redisAddr = "127.0.0.1:63779"
const redisPwd = "G62m50oigInC30sf"
func main() {
// 周期性工作
scheduler := asynq.NewScheduler(
asynq.RedisClientOpt{
Addr: redisAddr,
Password: redisPwd,
}, nil)
payload, err := json.Marshal(tpl.EmailPayload{Email: "546630576@qq.com", Content: "发邮件呀"})
if err != nil {
log.Fatal(err)
}
task := asynq.NewTask(tpl.EMAIL_TPL, payload)
// 每隔1分钟同步一次
entryID, err := scheduler.Register("*/1 * * * *", task)
if err != nil {
log.Fatal(err)
}
log.Printf("registered an entry: %q\n", entryID)
if err := scheduler.Run(); err != nil {
log.Fatal(err)
}
}
asynqtest/shedule/server/server.go
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"asynqtest/tpl"
"github.com/hibiken/asynq"
)
func main() {
srv := asynq.NewServer(
asynq.RedisClientOpt{Addr: "127.0.0.1:63779", Password: "G62m50oigInC30sf"},
asynq.Config{
Concurrency: 10,
Queues: map[string]int{
"critical": 6,
"default": 3,
"low": 1,
},
},
)
mux := asynq.NewServeMux()
// 敞开民宿订单工作
mux.HandleFunc(tpl.EMAIL_TPL, emailMqHandler)
if err := srv.Run(mux); err != nil {
log.Fatalf("could not run server: %v", err)
}
}
func emailMqHandler(ctx context.Context, t *asynq.Task) error {
var p tpl.EmailPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return fmt.Errorf("emailMqHandler err:%+v", err)
}
fmt.Printf("p : %+v \n", p)
return nil
}
asynqtest/tpl/tpl.go
package tpl
const EMAIL_TPL = "schedule:email"
type EmailPayload struct {
Email string
Content string
}
server.goclient.go
浏览器输出http://127.0.0.1:8980/schedulers这里 能够看到所有client定义的工作
浏览器输出http://127.0.0.1:8990/这里能够看到咱们的server生产请
控制台生产状况
说一下asynq的shedule在集成到我的项目中的思路,能够独自启动一个服务作为调度client定义零碎的定时任务调度治理,将server定义在每个业务本人的mq的asynq一起即可。
4、结尾
在这一节中,咱们学会应用了音讯队列、提早队列 ,kafka能够通过管理工具去查看,至于asynq查看webui在go-zero-looklook/docker-compose-env.yml中咱们曾经启动好了asynqmon,间接应用http://127.0.0.1:8980 即可查看
我的项目地址
https://github.com/zeromicro/go-zero
go-zero
微信交换群
关注『微服务实际』公众号并点击 交换群 获取社区群二维码。