简介

​message-bus​​message-bus​​message-bus​​message-bus​
​watermill​​Kafka/RabbitMQ​​HTTP/MySQL binlog​

快速使用

​watermill​​GoChannel​​watermill​

安装:

$ go get github.com/ThreeDotsLabs/watermill

使用:

package main

import (
"context"
"log"
"time"

"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
)

func main() {
pubSub := gochannel.NewGoChannel(
gochannel.Config{},
watermill.NewStdLogger(false, false),
)

messages, err := pubSub.Subscribe(context.Background(), "example.topic")
if err != nil {
panic(err)
}

go process(messages)

publishMessages(pubSub)
}

func publishMessages(publisher message.Publisher) {
for {
msg := message.NewMessage(watermill.NewUUID(), []byte("Hello, world!"))

if err := publisher.Publish("example.topic", msg); err != nil {
panic(err)
}

time.Sleep(time.Second)
}
}

func process(messages <-chan *message.Message) {
for msg := range messages {
log.Printf("received message: %s, payload: %s", msg.UUID, string(msg.Payload))
msg.Ack()
}
}
​GoChannel​​Subscribe​​Publish()​​Subscribe()​​<-chan *message.Message​​GoChannel​​goroutine​​goroutine​
​message.Message​​watermill​​Message​​[]byte​​Message​

有两点注意:

​Message​​Ack()​​GoChannel​​Message​​UUID​​watermill​​NewUUID()​

下面看示例运行:

Go 每日一库之 watermill_go

路由

​Ack()​
​watermill​

Go 每日一库之 watermill_消息队列_02

​goroutine​
var (
logger = watermill.NewStdLogger(false, false)
)

func main() {
router, err := message.NewRouter(message.RouterConfig{}, logger)
if err != nil {
panic(err)
}

pubSub := gochannel.NewGoChannel(gochannel.Config{}, logger)
go publishMessages(pubSub)

router.AddHandler("myhandler", "in_topic", pubSub, "out_topic", pubSub, myHandler{}.Handler)

router.AddNoPublisherHandler("print_in_messages", "in_topic", pubSub, printMessages)
router.AddNoPublisherHandler("print_out_messages", "out_topic", pubSub, printMessages)

ctx := context.Background()
if err := router.Run(ctx); err != nil {
panic(err)
}
}

func publishMessages(publisher message.Publisher) {
for {
msg := message.NewMessage(watermill.NewUUID(), []byte("Hello, world!"))
if err := publisher.Publish("in_topic", msg); err != nil {
panic(err)
}

time.Sleep(time.Second)
}
}

func printMessages(msg *message.Message) error {
fmt.Printf("\n> Received message: %s\n> %s\n>\n", msg.UUID, string(msg.Payload))
return nil
}

type myHandler struct {
}

func (m myHandler) Handler(msg *message.Message) ([]*message.Message, error) {
log.Println("myHandler received message", msg.UUID)

msg = message.NewMessage(watermill.NewUUID(), []byte("message produced by myHandler"))
return message.Messages{msg}, nil
}

首先,我们创建一个路由:

router, err := message.NewRouter(message.RouterConfig{}, logger)

然后为路由注册处理器。注册的处理器有两种类型,一种是:

router.AddHandler("myhandler", "in_topic", pubSub, "out_topic", pubSub, myHandler{}.Handler)

这个方法原型为:

func (r *Router) AddHandler(
handlerName string,
subscribeTopic string,
subscriber Subscriber,
publishTopic string,
publisher Publisher,
handlerFunc HandlerFunc,
) *Handler
​handlerName​​subscriber​​subscribeTopic​​handlerFunc​​publishTopic​​publisher​

另外一种处理器是下面这种形式:

router.AddNoPublisherHandler("print_in_messages", "in_topic", pubSub, printMessages)
router.AddNoPublisherHandler("print_out_messages", "out_topic", pubSub, printMessages)

从名字我们也可以看出,这种形式的处理器只处理接收到的消息,不发布新消息。

​router.Run()​
​GoChannel​
​Ack()​​Nack()​

上面只是路由的最基本用法,路由的强大之处在于中间件。

中间件

​watermill​
​IgnoreErrors​​Throttle​​Poison​​Retry​​Timeout​​InstantAck​​Ack()​​RandomFail​​Duplicator​​Correlation​​correlation id​​Recoverer​​panic​
​router.AddMiddleware()​
router.AddMiddleware(middleware.Duplicator)

想重试?可以:

router.AddMiddleware(middleware.Retry{
MaxRetries: 3,
InitialInterval: time.Millisecond * 100,
Logger: logger,
}.Middleware)

上面设置最大重试次数为 3,重试初始时间间隔为 100ms。

​Recoverer​
router.AddMiddleware(middleware.Recoverer)

也可以实现自己的中间件:

func MyMiddleware(h message.HandlerFunc) message.HandlerFunc {
return func(message *message.Message) ([]*message.Message, error) {
fields := watermill.LogFields{"name": m.Name}
logger.Info("myMiddleware before", fields)
ms, err := h(message)
logger.Info("myMiddleware after", fields)
return ms, err
}
}

中间件有两种实现方式,如果不需要参数或依赖,那么直接实现为函数即可,像上面这样。如果需要有参数,那么可以实现为一个结构:

type myMiddleware struct {
Name string
}

func (m myMiddleware) Middleware(h message.HandlerFunc) message.HandlerFunc {
return func(message *message.Message) ([]*message.Message, error) {
fields := watermill.LogFields{"name": m.Name}
logger.Info("myMiddleware before", fields)
ms, err := h(message)
logger.Info("myMiddleware after", fields)
return ms, err
}
}

这两种中间件的添加方式有所不同,第一种直接添加:

router.AddMiddleware(MyMiddleware)
​Middleware​​MyMiddleware​
router.AddMiddleware(MyMiddleware{Name:"dj"}.Middleware)

设置

如果运行上面程序,你很可能会看到这样一条日志:

No subscribers to send message
​goroutine​​watermill​
pubSub := gochannel.NewGoChannel(
gochannel.Config{
Persistent: true,
}, logger)
​GoChannel​​Config​​Persistent​​true​​No subscribers to send message​

RabbitMQ

​GoChannel​​watermill​​RabbitMQ​
​RabbitMQ​​RabbitMQ​​RabbitMQ​​Erlang​​choco​​choco​​RabbitMQ​
$ choco install rabbitmq
​RabbitMQ​
$ rabbitmq-server.bat
​watermill​​RabbitMQ​
$ go get -u github.com/ThreeDotsLabs/watermill-amqp/pkg/amqp

发布订阅:

var amqpURI = "amqp://localhost:5672/"

func main() {
amqpConfig := amqp.NewDurableQueueConfig(amqpURI)

subscriber, err := amqp.NewSubscriber(
amqpConfig,
watermill.NewStdLogger(false, false),
)
if err != nil {
panic(err)
}

messages, err := subscriber.Subscribe(context.Background(), "example.topic")
if err != nil {
panic(err)
}

go process(messages)

publisher, err := amqp.NewPublisher(amqpConfig, watermill.NewStdLogger(false, false))
if err != nil {
panic(err)
}

publishMessages(publisher)
}

func publishMessages(publisher message.Publisher) {
for {
msg := message.NewMessage(watermill.NewUUID(), []byte("Hello, world!"))

if err := publisher.Publish("example.topic", msg); err != nil {
panic(err)
}

time.Sleep(time.Second)
}
}

func process(messages <-chan *message.Message) {
for msg := range messages {
log.Printf("received message: %s, payload: %s", msg.UUID, string(msg.Payload))
msg.Ack()
}
}
​RabbitMQ​

总结

​watermill​​goroutine​

大家如果发现好玩、好用的 Go 语言库,欢迎到 Go 每日一库 GitHub 上提交 issue????

参考

欢迎关注我的微信公众号【GoUpUp】,共同学习,一起进步~

Go 每日一库之 watermill_go_03