简介
message-busmessage-busmessage-busmessage-bus
watermillKafka/RabbitMQHTTP/MySQL binlog
快速使用
watermillGoChannelwatermill
安装:
$ go get github.com/ThreeDotsLabs/watermill
使用:
package main
import (
"context"
"log"
"time"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
)
func main() {
pubSub := gochannel.NewGoChannel(
gochannel.Config{},
watermill.NewStdLogger(false, false),
)
messages, err := pubSub.Subscribe(context.Background(), "example.topic")
if err != nil {
panic(err)
}
go process(messages)
publishMessages(pubSub)
}
func publishMessages(publisher message.Publisher) {
for {
msg := message.NewMessage(watermill.NewUUID(), []byte("Hello, world!"))
if err := publisher.Publish("example.topic", msg); err != nil {
panic(err)
}
time.Sleep(time.Second)
}
}
func process(messages <-chan *message.Message) {
for msg := range messages {
log.Printf("received message: %s, payload: %s", msg.UUID, string(msg.Payload))
msg.Ack()
}
}
GoChannelSubscribePublish()Subscribe()<-chan *message.MessageGoChannelgoroutinegoroutine
message.MessagewatermillMessage[]byteMessage
有两点注意:
MessageAck()GoChannelMessageUUIDwatermillNewUUID()
下面看示例运行:
路由
Ack()
watermill
goroutine
var (
logger = watermill.NewStdLogger(false, false)
)
func main() {
router, err := message.NewRouter(message.RouterConfig{}, logger)
if err != nil {
panic(err)
}
pubSub := gochannel.NewGoChannel(gochannel.Config{}, logger)
go publishMessages(pubSub)
router.AddHandler("myhandler", "in_topic", pubSub, "out_topic", pubSub, myHandler{}.Handler)
router.AddNoPublisherHandler("print_in_messages", "in_topic", pubSub, printMessages)
router.AddNoPublisherHandler("print_out_messages", "out_topic", pubSub, printMessages)
ctx := context.Background()
if err := router.Run(ctx); err != nil {
panic(err)
}
}
func publishMessages(publisher message.Publisher) {
for {
msg := message.NewMessage(watermill.NewUUID(), []byte("Hello, world!"))
if err := publisher.Publish("in_topic", msg); err != nil {
panic(err)
}
time.Sleep(time.Second)
}
}
func printMessages(msg *message.Message) error {
fmt.Printf("\n> Received message: %s\n> %s\n>\n", msg.UUID, string(msg.Payload))
return nil
}
type myHandler struct {
}
func (m myHandler) Handler(msg *message.Message) ([]*message.Message, error) {
log.Println("myHandler received message", msg.UUID)
msg = message.NewMessage(watermill.NewUUID(), []byte("message produced by myHandler"))
return message.Messages{msg}, nil
}
首先,我们创建一个路由:
router, err := message.NewRouter(message.RouterConfig{}, logger)
然后为路由注册处理器。注册的处理器有两种类型,一种是:
router.AddHandler("myhandler", "in_topic", pubSub, "out_topic", pubSub, myHandler{}.Handler)
这个方法原型为:
func (r *Router) AddHandler(
handlerName string,
subscribeTopic string,
subscriber Subscriber,
publishTopic string,
publisher Publisher,
handlerFunc HandlerFunc,
) *Handler
handlerNamesubscribersubscribeTopichandlerFuncpublishTopicpublisher
另外一种处理器是下面这种形式:
router.AddNoPublisherHandler("print_in_messages", "in_topic", pubSub, printMessages)
router.AddNoPublisherHandler("print_out_messages", "out_topic", pubSub, printMessages)
从名字我们也可以看出,这种形式的处理器只处理接收到的消息,不发布新消息。
router.Run()
GoChannel
Ack()Nack()
上面只是路由的最基本用法,路由的强大之处在于中间件。
中间件
watermill
IgnoreErrorsThrottlePoisonRetryTimeoutInstantAckAck()RandomFailDuplicatorCorrelationcorrelation idRecovererpanic
router.AddMiddleware()
router.AddMiddleware(middleware.Duplicator)
想重试?可以:
router.AddMiddleware(middleware.Retry{
MaxRetries: 3,
InitialInterval: time.Millisecond * 100,
Logger: logger,
}.Middleware)
上面设置最大重试次数为 3,重试初始时间间隔为 100ms。
Recoverer
router.AddMiddleware(middleware.Recoverer)
也可以实现自己的中间件:
func MyMiddleware(h message.HandlerFunc) message.HandlerFunc {
return func(message *message.Message) ([]*message.Message, error) {
fields := watermill.LogFields{"name": m.Name}
logger.Info("myMiddleware before", fields)
ms, err := h(message)
logger.Info("myMiddleware after", fields)
return ms, err
}
}
中间件有两种实现方式,如果不需要参数或依赖,那么直接实现为函数即可,像上面这样。如果需要有参数,那么可以实现为一个结构:
type myMiddleware struct {
Name string
}
func (m myMiddleware) Middleware(h message.HandlerFunc) message.HandlerFunc {
return func(message *message.Message) ([]*message.Message, error) {
fields := watermill.LogFields{"name": m.Name}
logger.Info("myMiddleware before", fields)
ms, err := h(message)
logger.Info("myMiddleware after", fields)
return ms, err
}
}
这两种中间件的添加方式有所不同,第一种直接添加:
router.AddMiddleware(MyMiddleware)
MiddlewareMyMiddleware
router.AddMiddleware(MyMiddleware{Name:"dj"}.Middleware)
设置
如果运行上面程序,你很可能会看到这样一条日志:
No subscribers to send message
goroutinewatermill
pubSub := gochannel.NewGoChannel(
gochannel.Config{
Persistent: true,
}, logger)
GoChannelConfigPersistenttrueNo subscribers to send message
RabbitMQ
GoChannelwatermillRabbitMQ
RabbitMQRabbitMQRabbitMQErlangchocochocoRabbitMQ
$ choco install rabbitmq
RabbitMQ
$ rabbitmq-server.bat
watermillRabbitMQ
$ go get -u github.com/ThreeDotsLabs/watermill-amqp/pkg/amqp
发布订阅:
var amqpURI = "amqp://localhost:5672/"
func main() {
amqpConfig := amqp.NewDurableQueueConfig(amqpURI)
subscriber, err := amqp.NewSubscriber(
amqpConfig,
watermill.NewStdLogger(false, false),
)
if err != nil {
panic(err)
}
messages, err := subscriber.Subscribe(context.Background(), "example.topic")
if err != nil {
panic(err)
}
go process(messages)
publisher, err := amqp.NewPublisher(amqpConfig, watermill.NewStdLogger(false, false))
if err != nil {
panic(err)
}
publishMessages(publisher)
}
func publishMessages(publisher message.Publisher) {
for {
msg := message.NewMessage(watermill.NewUUID(), []byte("Hello, world!"))
if err := publisher.Publish("example.topic", msg); err != nil {
panic(err)
}
time.Sleep(time.Second)
}
}
func process(messages <-chan *message.Message) {
for msg := range messages {
log.Printf("received message: %s, payload: %s", msg.UUID, string(msg.Payload))
msg.Ack()
}
}
RabbitMQ
总结
watermillgoroutine
大家如果发现好玩、好用的 Go 语言库,欢迎到 Go 每日一库 GitHub 上提交 issue????
参考
我
欢迎关注我的微信公众号【GoUpUp】,共同学习,一起进步~