go-zeroapp modulegateway & RPC

Delay Job

go-queuego-queuego-queuego-zero
dq  beanstalkdkqkafkakafka

咱们次要说一下dq,kq应用也一样的,只是依赖底层不同,如果没应用过beanstalkd,没接触过beanstalkd的能够先google一下,应用起来还是挺容易的。

我在jobs下应用goctl新建了一个message-job.api服务

info(
    title: //音讯工作
    desc: // 音讯工作
    author: "Mikael"
    email: "13247629622@163.com"
)

type BatchSendMessageReq {}

type BatchSendMessageResp {}

service message-job-api {
    @handler batchSendMessageHandler // 批量发送短信
    post batchSendMessage(BatchSendMessageReq) returns(BatchSendMessageResp)
}

因为不须要应用路由,所以handler下的routes.go被我删除了,在handler下新建了一个jobRun.go,内容如下:

package handler

import (
    "fishtwo/lib/xgo"
    "fishtwo/app/jobs/message/internal/svc"
)


/**
* @Description 启动job
* @Author Mikael
* @Date 2021/1/18 12:05
* @Version 1.0
**/

func JobRun(serverCtx *svc.ServiceContext)  {
    xgo.Go(func() {
        batchSendMessageHandler(serverCtx)
    //...many job
    })
}
go batchSendMessageHandler(serverCtx)

而后批改一下启动文件message-job.go

package main

import (
   "flag"
   "fmt"

   "fishtwo/app/jobs/message/internal/config"
   "fishtwo/app/jobs/message/internal/handler"
   "fishtwo/app/jobs/message/internal/svc"

   "github.com/tal-tech/go-zero/core/conf"
   "github.com/tal-tech/go-zero/rest"
)

var configFile = flag.String("f", "etc/message-job-api.yaml", "the config file")

func main() {
   flag.Parse()

   var c config.Config
   conf.MustLoad(*configFile, &c)

   ctx := svc.NewServiceContext(c)
   server := rest.MustNewServer(c.RestConf)
   defer server.Stop()

   handler.JobRun(ctx)

   fmt.Printf("Starting server at %s:%d...\n", c.Host, c.Port)
   server.Start()
}

次要是handler.RegisterHandlers(server, ctx) 批改为handler.JobRun(ctx)

接下来,咱们就能够引入dq了,首先在etc/xxx.yaml下增加dqConf

.....

DqConf:
  Beanstalks:
    - Endpoint: 127.0.0.1:7771
      Tube: tube1
    - Endpoint: 127.0.0.1:7772
      Tube: tube2
  Redis:
    Host: 127.0.0.1:6379
    Type: node

我这里本地用不同端口,模仿开了2个节点,7771、7772

在internal/config/config.go增加配置解析对象

type Config struct {
    ....
    DqConf dq.DqConf
}

批改handler/batchsendmessagehandler.go

package handler

import (
    "context"
    "fishtwo/app/jobs/message/internal/logic"
    "fishtwo/app/jobs/message/internal/svc"
    "github.com/tal-tech/go-zero/core/logx"
)

func batchSendMessageHandler(ctx *svc.ServiceContext){
    rootCxt:= context.Background()
    l := logic.NewBatchSendMessageLogic(context.Background(), ctx)
    err := l.BatchSendMessage()
    if err != nil{
        logx.WithContext(rootCxt).Error("【JOB-ERR】 : %+v ",err)
    }
}

批改logic下batchsendmessagelogic.go,写咱们的consumer生产逻辑

package logic

import (
   "context"
   "fishtwo/app/jobs/message/internal/svc"
   "fmt"
   "github.com/tal-tech/go-zero/core/logx"
)

type BatchSendMessageLogic struct {
   logx.Logger
   ctx    context.Context
   svcCtx *svc.ServiceContext
}

func NewBatchSendMessageLogic(ctx context.Context, svcCtx *svc.ServiceContext) BatchSendMessageLogic {
   return BatchSendMessageLogic{
       Logger: logx.WithContext(ctx),
       ctx:    ctx,
       svcCtx: svcCtx,
   }
}


func (l *BatchSendMessageLogic) BatchSendMessage() error {
   fmt.Println("job BatchSendMessage start")

   l.svcCtx.Consumer.Consume(func(body []byte) {
       fmt.Printf("job BatchSendMessage %s \n" + string(body))
   })

   fmt.Printf("job BatchSendMessage finish \n")
   return nil
}

这样就功败垂成了,启动message-job.go就ok课

go run message-job.go

之后咱们就能够在业务代码中向dq增加工作,它就能够主动生产了

producer.Delay 向dq中投递5个提早工作:

    producer := dq.NewProducer([]dq.Beanstalk{
        {
            Endpoint: "localhost:7771",
            Tube:     "tube1",
        },
        {
            Endpoint: "localhost:7772",
            Tube:     "tube2",
        },
    })

    for i := 1000; i < 1005; i++ {
        _, err := producer.Delay([]byte(strconv.Itoa(i)), time.Second * 1)
        if err != nil {
            fmt.Println(err)
        }
    }
producer.At

谬误日志

在后面说到gateway革新时候,如果眼神好的童鞋,在下面的httpresult.go中曾经看到了log的身影:

咱们在来看下rpc中怎么解决的

是的,我在每个rpc启动的main中退出了grpc拦截器 https://www.yuque.com/tal-tec…,那让咱们看看grpc拦截器外面做了什么

而后我代码外面应用github/pkg/errors这个包去处理错误的,这个包还是很好用的

所以呢:

grpclogx.WithContext(ctx).Errorf("【RPC-SRV-ERR】 %+v",err)
apilogx.WithContext(r.Context()).Error("【GATEWAY-SRV-ERR】 : %+v ",err)
go-zero logx.WithContext 
user-api --> user-srv --> message-srv
messsage-srvtrace-idjaeger、zipkin、skywalking

框架地址

https://github.com/tal-tech/go-zero

欢送应用 go-zero 并 star 反对咱们!????

go-zero 系列文章见『微服务实际』公众号