rabbitmq消息模式
rabbitmq中进行消息控制的组建可以分为以下几部分:
- exchange:rabbitmq中的路由部件,控制消息的转发路径;
- queue:rabbitmq的消息队列,可以有多个消费者从队列中读取消息;
- consumer:消息的消费者;
rabbitmq在使用过程中可以单独使用queue进行消息传递(例如celery就可以使用单个queue进行多对多的消息传递),也利用exchange与queue构建多种消息模式,主要包括fanout、direct和topic方式,模式的使用方式在此放一张图,不再此做详细解释。
我在使用的rabbitmq的过程中,主要是进行消息的广播及主题订阅:
[producer] -> [exchange] ->fanout-> [queue of consumer] -> [consumer]
| /|\
------->[exchange] ->topic------
不同的设备连接到rabbitmq中创建自己的queue,将queue绑定的两个不同的exchange,分别接收广播消息及主题消息。通过配置queue的持久化及消息过期时间,则可以在设备短暂下线的情况下,将消息缓存在queue中,之后上线后再从queue中读取消息。
rabbitmq客户端
github.com/streadway/amqp
在此主要对客户端构建中的一些问题进行陈述,详细的客户端构建代码请参见:rabbitmq_client.go
创建queue
exchange和queue实际上都是通过amqp协议进行创建的,如果在创建过程时,rabbitmq中已经有相同名称的exchange或queue但属性不则会创建失败。通常情况下exchange的属性不会变化,但是queue可能会修改过期时间、消息TTL等属性,因此实现过程中,若queue创建不成功则进行删除后再创建(在我的应用场景中queue与消费者绑定,因此不存在误删在使用中的queue的问题):
func (clt *Client) queInit(server *broker, ifFresh bool) (err error) {
var num int
ch := clt.ch
if ifFresh {
num, err = ch.QueueDelete(
server.quePrefix+"."+clt.device,
false,
false,
false,
)
if err != nil {
return
}
log.Println("[RABBITMQ_CLIENT]", clt.device, "queue deleted with", num, "message purged")
}
args := make(amqp.Table)
args["x-message-ttl"] = messageTTL
args["x-expires"] = queueExpire
q, err := ch.QueueDeclare(
server.quePrefix+"."+clt.device, // name
true, // durable
false, // delete when usused
false, // exclusive
false, // no-wait
args, // arguments
)
// 注意在此配置的两个参数,详细用意请参见 http://next.rabbitmq.com/ttl.html
if err != nil {
return
}
for _, topic := range server.topics {
err = ch.QueueBind(
q.Name,
topic.keyPrefix+"."+clt.device,
topic.chanName,
false,
nil,
)
if err != nil {
return
}
}
clt.que = q
return
}
消息接收
对于消费者消息的接收过程如下所示:
msgs, err := clt.ch.Consume(
clt.que.Name, // queue
clt.device, // consumer
false, // auto ack
false, // exclusive
false, // no local
false, // no wait
nil, // args
)
if err != nil {
clt.Close()
log.Println("[RABBITMQ_CLIENT]", "Start consume ERROR:", err)
return nil
}
clt.msgs = msgs
clt.pubChan = make(chan *publishMsg, 4)
go func() {
cc := make(chan *amqp.Error)
e := <-clt.ch.NotifyClose(cc)
log.Println("[RABBITMQ_CLIENT]", "channel close error:", e.Error())
clt.cancel()
}()
go func() {
for d := range msgs {
msg := d.Body
msgProcess(d.Exchange, msg)
d.Ack(false)
}
}()
ch.Consumed.Ack(false)ch.NotifyClose(cc)amqp.Channel
amqp.channel
消息发送
在amqp的消息发送过程中,其对于消息的确认机制略有些蛋疼。因为在发送的时候不可配置发送的消息id,但在接收确认时,消息id是按照自然数递增的,也就是说发送者需要按照自然数递增的顺序自己维护发送的消息id。相关代码如下所示:
func (clt *Client) publishProc() {
ticker := time.NewTicker(tickTime)
deliveryMap := make(map[uint64]*publishMsg)
defer func() {
atomic.AddInt32(&clt.onPublish, -1)
ticker.Stop()
for _, msg := range deliveryMap {
msg.ackErr = errCancel
msg.cancel()
}
}()
var deliveryTag uint64 = 1
var ackTag uint64 = 1
var pMsg *publishMsg
for {
select {
case <-clt.ctx.Done():
return
case pMsg = <-clt.pubChan:
pMsg.startTime = time.Now()
err := clt.sendPublish(pMsg.topicId, pMsg.keySuffix, pMsg.msg, pMsg.expire)
if err != nil {
pMsg.ackErr = err
pMsg.cancel()
}
deliveryMap[deliveryTag] = pMsg
deliveryTag++
case c, ok := <-clt.confirm:
if !ok {
log.Println("[RABBITMQ_CLIENT]", "client Publish notify channel error")
return
}
pMsg = deliveryMap[c.DeliveryTag]
// fmt.Println("DeliveryTag:", c.DeliveryTag)
delete(deliveryMap, c.DeliveryTag)
if c.Ack {
pMsg.ackErr = nil
pMsg.cancel()
} else {
pMsg.ackErr = errNack
pMsg.cancel()
}
case <-ticker.C:
now := time.Now()
for {
if len(deliveryMap) == 0 {
break
}
pMsg = deliveryMap[ackTag]
if pMsg != nil {
if now.Sub(pMsg.startTime.Add(pubTime)) > 0 {
pMsg.ackErr = errTimeout
pMsg.cancel()
delete(deliveryMap, ackTag)
} else {
break
}
}
ackTag++
}
}
}
}
发送过程的构造要点:
map[uint64]*publishMsg
contextpublishMsgackErr
总结
作为golang的入门级选手,在实现rabbitmq客户端过程中还是踩了一些坑,最后的实现还是可以算是高效可靠。rabbitmq的库本身有心跳机制来维持与服务器之间的连接,但依据实现mqtt客户端的经验,还是自己实现了心跳来保障客户端上层连接的可靠性。因此在接收和发送两方面,该客户端实现还是经受住了考验,欢迎大家参考。