简介
message-busmessage-busmessage-busmessage-buswatermillKafka/RabbitMQHTTP/MySQL binlog快速使用
watermillGoChannelwatermill安装:
$ go get github.com/ThreeDotsLabs/watermill
使用:
package main
import (
"context"
"log"
"time"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
)
func main() {
pubSub := gochannel.NewGoChannel(
gochannel.Config{},
watermill.NewStdLogger(false, false),
)
messages, err := pubSub.Subscribe(context.Background(), "example.topic")
if err != nil {
panic(err)
}
go process(messages)
publishMessages(pubSub)
}
func publishMessages(publisher message.Publisher) {
for {
msg := message.NewMessage(watermill.NewUUID(), []byte("Hello, world!"))
if err := publisher.Publish("example.topic", msg); err != nil {
panic(err)
}
time.Sleep(time.Second)
}
}
func process(messages <-chan *message.Message) {
for msg := range messages {
log.Printf("received message: %s, payload: %s", msg.UUID, string(msg.Payload))
msg.Ack()
}
}
GoChannelSubscribePublish()Subscribe()<-chan *message.MessageGoChannelgoroutinegoroutinemessage.MessagewatermillMessage[]byteMessage有两点注意:
MessageAck()GoChannelMessageUUIDwatermillNewUUID()下面看示例运行:

路由
Ack()watermill
goroutinevar (
logger = watermill.NewStdLogger(false, false)
)
func main() {
router, err := message.NewRouter(message.RouterConfig{}, logger)
if err != nil {
panic(err)
}
pubSub := gochannel.NewGoChannel(gochannel.Config{}, logger)
go publishMessages(pubSub)
router.AddHandler("myhandler", "in_topic", pubSub, "out_topic", pubSub, myHandler{}.Handler)
router.AddNoPublisherHandler("print_in_messages", "in_topic", pubSub, printMessages)
router.AddNoPublisherHandler("print_out_messages", "out_topic", pubSub, printMessages)
ctx := context.Background()
if err := router.Run(ctx); err != nil {
panic(err)
}
}
func publishMessages(publisher message.Publisher) {
for {
msg := message.NewMessage(watermill.NewUUID(), []byte("Hello, world!"))
if err := publisher.Publish("in_topic", msg); err != nil {
panic(err)
}
time.Sleep(time.Second)
}
}
func printMessages(msg *message.Message) error {
fmt.Printf("\n> Received message: %s\n> %s\n>\n", msg.UUID, string(msg.Payload))
return nil
}
type myHandler struct {
}
func (m myHandler) Handler(msg *message.Message) ([]*message.Message, error) {
log.Println("myHandler received message", msg.UUID)
msg = message.NewMessage(watermill.NewUUID(), []byte("message produced by myHandler"))
return message.Messages{msg}, nil
}
首先,我们创建一个路由:
router, err := message.NewRouter(message.RouterConfig{}, logger)
然后为路由注册处理器。注册的处理器有两种类型,一种是:
router.AddHandler("myhandler", "in_topic", pubSub, "out_topic", pubSub, myHandler{}.Handler)
这个方法原型为:
func (r *Router) AddHandler(
handlerName string,
subscribeTopic string,
subscriber Subscriber,
publishTopic string,
publisher Publisher,
handlerFunc HandlerFunc,
) *Handler
handlerNamesubscribersubscribeTopichandlerFuncpublishTopicpublisher另外一种处理器是下面这种形式:
router.AddNoPublisherHandler("print_in_messages", "in_topic", pubSub, printMessages)
router.AddNoPublisherHandler("print_out_messages", "out_topic", pubSub, printMessages)
从名字我们也可以看出,这种形式的处理器只处理接收到的消息,不发布新消息。
router.Run()GoChannelAck()Nack()上面只是路由的最基本用法,路由的强大之处在于中间件。
中间件
watermillIgnoreErrorsThrottlePoisonRetryTimeoutInstantAckAck()RandomFailDuplicatorCorrelationcorrelation idRecovererpanicrouter.AddMiddleware()router.AddMiddleware(middleware.Duplicator)
想重试?可以:
router.AddMiddleware(middleware.Retry{
MaxRetries: 3,
InitialInterval: time.Millisecond * 100,
Logger: logger,
}.Middleware)
上面设置最大重试次数为 3,重试初始时间间隔为 100ms。
Recovererrouter.AddMiddleware(middleware.Recoverer)
也可以实现自己的中间件:
func MyMiddleware(h message.HandlerFunc) message.HandlerFunc {
return func(message *message.Message) ([]*message.Message, error) {
fields := watermill.LogFields{"name": m.Name}
logger.Info("myMiddleware before", fields)
ms, err := h(message)
logger.Info("myMiddleware after", fields)
return ms, err
}
}
中间件有两种实现方式,如果不需要参数或依赖,那么直接实现为函数即可,像上面这样。如果需要有参数,那么可以实现为一个结构:
type myMiddleware struct {
Name string
}
func (m myMiddleware) Middleware(h message.HandlerFunc) message.HandlerFunc {
return func(message *message.Message) ([]*message.Message, error) {
fields := watermill.LogFields{"name": m.Name}
logger.Info("myMiddleware before", fields)
ms, err := h(message)
logger.Info("myMiddleware after", fields)
return ms, err
}
}
这两种中间件的添加方式有所不同,第一种直接添加:
router.AddMiddleware(MyMiddleware)
MiddlewareMyMiddlewarerouter.AddMiddleware(MyMiddleware{Name:"dj"}.Middleware)
设置
如果运行上面程序,你很可能会看到这样一条日志:
No subscribers to send message
goroutinewatermillpubSub := gochannel.NewGoChannel(
gochannel.Config{
Persistent: true,
}, logger)
GoChannelConfigPersistenttrueNo subscribers to send messageRabbitMQ
GoChannelwatermillRabbitMQRabbitMQRabbitMQRabbitMQErlangchocochocoRabbitMQ$ choco install rabbitmq
RabbitMQ$ rabbitmq-server.bat
watermillRabbitMQ$ go get -u github.com/ThreeDotsLabs/watermill-amqp/pkg/amqp
发布订阅:
var amqpURI = "amqp://localhost:5672/"
func main() {
amqpConfig := amqp.NewDurableQueueConfig(amqpURI)
subscriber, err := amqp.NewSubscriber(
amqpConfig,
watermill.NewStdLogger(false, false),
)
if err != nil {
panic(err)
}
messages, err := subscriber.Subscribe(context.Background(), "example.topic")
if err != nil {
panic(err)
}
go process(messages)
publisher, err := amqp.NewPublisher(amqpConfig, watermill.NewStdLogger(false, false))
if err != nil {
panic(err)
}
publishMessages(publisher)
}
func publishMessages(publisher message.Publisher) {
for {
msg := message.NewMessage(watermill.NewUUID(), []byte("Hello, world!"))
if err := publisher.Publish("example.topic", msg); err != nil {
panic(err)
}
time.Sleep(time.Second)
}
}
func process(messages <-chan *message.Message) {
for msg := range messages {
log.Printf("received message: %s, payload: %s", msg.UUID, string(msg.Payload))
msg.Ack()
}
}
RabbitMQ总结
watermillgoroutine大家如果发现好玩、好用的 Go 语言库,欢迎到 Go 每日一库 GitHub 上提交 issue????
参考
我
欢迎关注我的微信公众号【GoUpUp】,共同学习,一起进步~
