rabbitmq消息模式

rabbitmq中进行消息控制的组建可以分为以下几部分:

  1. exchange:rabbitmq中的路由部件,控制消息的转发路径;
  2. queue:rabbitmq的消息队列,可以有多个消费者从队列中读取消息;
  3. consumer:消息的消费者;

rabbitmq在使用过程中可以单独使用queue进行消息传递(例如celery就可以使用单个queue进行多对多的消息传递),也利用exchange与queue构建多种消息模式,主要包括fanout、direct和topic方式,模式的使用方式在此放一张图,不再此做详细解释。

rabbitmq

我在使用的rabbitmq的过程中,主要是进行消息的广播及主题订阅:

[producer] -> [exchange] ->fanout-> [queue of consumer] -> [consumer]
       |                             /|\
       ------->[exchange] ->topic------

不同的设备连接到rabbitmq中创建自己的queue,将queue绑定的两个不同的exchange,分别接收广播消息及主题消息。通过配置queue的持久化及消息过期时间,则可以在设备短暂下线的情况下,将消息缓存在queue中,之后上线后再从queue中读取消息。

rabbitmq客户端

github.com/streadway/amqp

在此主要对客户端构建中的一些问题进行陈述,详细的客户端构建代码请参见:rabbitmq_client.go

创建queue

exchange和queue实际上都是通过amqp协议进行创建的,如果在创建过程时,rabbitmq中已经有相同名称的exchange或queue但属性不则会创建失败。通常情况下exchange的属性不会变化,但是queue可能会修改过期时间、消息TTL等属性,因此实现过程中,若queue创建不成功则进行删除后再创建(在我的应用场景中queue与消费者绑定,因此不存在误删在使用中的queue的问题):

func (clt *Client) queInit(server *broker, ifFresh bool) (err error) {

	var num int
	ch := clt.ch

	if ifFresh {
		num, err = ch.QueueDelete(
			server.quePrefix+"."+clt.device,
			false,
			false,
			false,
		)
		if err != nil {
			return
		}
		log.Println("[RABBITMQ_CLIENT]", clt.device, "queue deleted with", num, "message purged")
	}

	args := make(amqp.Table)
	args["x-message-ttl"] = messageTTL
	args["x-expires"] = queueExpire
	q, err := ch.QueueDeclare(
		server.quePrefix+"."+clt.device, // name
		true,  // durable
		false, // delete when usused
		false, // exclusive
		false, // no-wait
		args,  // arguments
	)
    // 注意在此配置的两个参数,详细用意请参见 http://next.rabbitmq.com/ttl.html
	if err != nil {
		return
	}

	for _, topic := range server.topics {
		err = ch.QueueBind(
			q.Name,
			topic.keyPrefix+"."+clt.device,
			topic.chanName,
			false,
			nil,
		)
		if err != nil {
			return
		}
	}

	clt.que = q
	return
}

消息接收

对于消费者消息的接收过程如下所示:

msgs, err := clt.ch.Consume(
		clt.que.Name, // queue
		clt.device,   // consumer
		false,        // auto ack
		false,        // exclusive
		false,        // no local
		false,        // no wait
		nil,          // args
	)
	if err != nil {
		clt.Close()
		log.Println("[RABBITMQ_CLIENT]", "Start consume ERROR:", err)
		return nil
	}

	clt.msgs = msgs
	clt.pubChan = make(chan *publishMsg, 4)

	go func() {
		cc := make(chan *amqp.Error)
		e := <-clt.ch.NotifyClose(cc)
		log.Println("[RABBITMQ_CLIENT]", "channel close error:", e.Error())
		clt.cancel()
	}()

	go func() {
		for d := range msgs {
			msg := d.Body
			msgProcess(d.Exchange, msg)
			d.Ack(false)
		}
	}()

ch.Consumed.Ack(false)ch.NotifyClose(cc)amqp.Channel
amqp.channel

消息发送

在amqp的消息发送过程中,其对于消息的确认机制略有些蛋疼。因为在发送的时候不可配置发送的消息id,但在接收确认时,消息id是按照自然数递增的,也就是说发送者需要按照自然数递增的顺序自己维护发送的消息id。相关代码如下所示:

func (clt *Client) publishProc() {
	ticker := time.NewTicker(tickTime)
	deliveryMap := make(map[uint64]*publishMsg)

	defer func() {
		atomic.AddInt32(&clt.onPublish, -1)
		ticker.Stop()
		for _, msg := range deliveryMap {
			msg.ackErr = errCancel
			msg.cancel()
		}
	}()

	var deliveryTag uint64 = 1
	var ackTag uint64 = 1
	var pMsg *publishMsg
	for {
		select {

		case <-clt.ctx.Done():
			return

		case pMsg = <-clt.pubChan:
			pMsg.startTime = time.Now()
			err := clt.sendPublish(pMsg.topicId, pMsg.keySuffix, pMsg.msg, pMsg.expire)
			if err != nil {
				pMsg.ackErr = err
				pMsg.cancel()
			}
			deliveryMap[deliveryTag] = pMsg
			deliveryTag++

		case c, ok := <-clt.confirm:
			if !ok {
				log.Println("[RABBITMQ_CLIENT]", "client Publish notify channel error")
				return
			}
			pMsg = deliveryMap[c.DeliveryTag]
			// fmt.Println("DeliveryTag:", c.DeliveryTag)
			delete(deliveryMap, c.DeliveryTag)
			if c.Ack {
				pMsg.ackErr = nil
				pMsg.cancel()
			} else {
				pMsg.ackErr = errNack
				pMsg.cancel()
			}
		case <-ticker.C:
			now := time.Now()
			for {
				if len(deliveryMap) == 0 {
					break
				}
				pMsg = deliveryMap[ackTag]
				if pMsg != nil {
					if now.Sub(pMsg.startTime.Add(pubTime)) > 0 {
						pMsg.ackErr = errTimeout
						pMsg.cancel()
						delete(deliveryMap, ackTag)
					} else {
						break
					}
				}
				ackTag++
			}
		}
	}
}

发送过程的构造要点:

map[uint64]*publishMsg
contextpublishMsgackErr

总结

作为golang的入门级选手,在实现rabbitmq客户端过程中还是踩了一些坑,最后的实现还是可以算是高效可靠。rabbitmq的库本身有心跳机制来维持与服务器之间的连接,但依据实现mqtt客户端的经验,还是自己实现了心跳来保障客户端上层连接的可靠性。因此在接收和发送两方面,该客户端实现还是经受住了考验,欢迎大家参考。