1. 安装 rabbitmq 的 golang 包
golang 可使用库 github.com/streadway/amqp 操作 rabbitmq 。使用下面命令安装 RabbitMQ 。
go get -v github.com/streadway/amqp
2. 生产者流程
在 Golang 中创建 rabbitmq 生产者基本步骤是:
连接 Connection
创建 Channel
创建或连接一个交换器
创建或连接一个队列
交换器绑定队列
投递消息
关闭 Channel
关闭 Connection
2.1 创建连接
// connection connection, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
2.2 创建通道
// channel channel, err := connection.Channel()
2.3 创建交换器
err = channel.ExchangeDeclare("e1", "direct", true, false, false, true, nil)
参数依次说明:
name 交换机名称
kind 交换机类型
durable 持久化标识
autoDelete 是否自动删除
internal 是否是内置交换机
noWait 是否等待服务器确认
args 其它配置
参数说明要点:
autoDelete :
自动删除功能必须要在交换器曾经绑定过队列或者交换器的情况下,处于不再使用的时候才会自动删除,如果是刚刚创建的尚未绑定队列或者交换器的交换器或者早已创建只是未进行队列或者交换器绑定的交换器是不会自动删除的。
internal :
内置交换器是一种特殊的交换器,这种交换器不能直接接收生产者发送的消息,只能作为类似于队列的方式绑定到另一个交换器,来接收这个交换器中路由的消息,内置交换器同样可以绑定队列和路由消息,只是其接收消息的来源与普通交换器不同。
noWait
当 noWait 为 true 时,声明时无需等待服务器的确认。
该通道可能由于错误而关闭。 添加一个 NotifyClose 侦听器应对任何异常。创建交换器还有一个差不多的方法( ExchangeDeclarePassive ),他主要是假定交换已存在,并尝试连接到不存在的交换将导致 RabbitMQ 引发异常,可用于检测交换器的存在。
2.4 创建队列
q, err := channel.QueueDeclare("q1", true, false, false, true, nil)
参数说明:
name 队列名称
durable 持久化
autoDelete 自动删除
exclusive 排他
noWait 是否等待服务器确认
args Table
参数说明要点:
exclusive 排他
排他队列只对首次创建它的连接可见,排他队列是基于连接( Connection )可见的,并且该连接内的所有信道( Channel)都可以访问这个排他队列,在这个连接断开之后,该队列自动删除,由此可见这个队列可以说是绑到连接上的,对同一服务器的其他连接不可见。
同一连接中不允许建立同名的排他队列的这种排他优先于持久化,即使设置了队列持久化,在连接断开后,该队列也会自动删除。
非排他队列不依附于连接而存在,同一服务器上的多个连接都可以访问这个队列。
autoDelete 设置是否自动删除。
为 true 则设置队列为自动删除。
自动删除的前提是:至少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才会自动删除。
不能把这个参数错误地理解为:“当连接到此队列的所有客户端断开时,这个队列自动删除”,因为生产者客户端创建这个队列,或者没有消费者客户端与这个队列连接时,都不会自动删除这个队列。
创建队列还有一个差不多的方法( QueueDeclarePassive ),他主要是假定队列已存在,并尝试连接到不存在的队列将导致 RabbitMQ 引发异常,可用于检测队列的存在。
2.5 绑定交换器和队列
err = channel.QueueBind("q1", "q1Key", "e1", true, nil)
参数解析:
name 队列名称
key BindingKey 根据交换机类型来设定
exchange 交换机名称
noWait 是否等待服务器确认
args Table
2.6 绑定交换器(可选)
err = channel.ExchangeBind("dest", "q1Key", "src", false, nil)
参数解析:
destination 目的交换器
key RoutingKey 路由键
source 源交换器
noWait 是否等待服务器确认
args Table 其它参数
生产者发送消息至交换器 source 中,交换器 source 根据路由键找到与其匹配的另一个交换器 destination ,井把消息转发到 destination 中,进而存储在 destination 绑定的队列 queue 中,某种程度上来说 destination 交换器可以看作一个队列。如图:
2.7 投递消息
err = channel.Publish("e1", "q1Key", true, false, amqp.Publishing{ Timestamp: time.Now(), DeliveryMode: amqp.Persistent, //Msg set as persistent ContentType: "text/plain", Body: []byte("Hello Golang and AMQP(Rabbitmq)!"), })
参数解析:
exchange 交换器名称
key RouterKey
mandatory 是否为无法路由的消息进行返回处理
immediate 是否对路由到无消费者队列的消息进行返回处理 RabbitMQ 3.0 废弃
msg 消息体
参数说明要点:
mandatory
消息发布的时候设置消息的 mandatory 属性用于设置消息在发送到交换器之后无法路由到队列的情况对消息的处理方式,设置为 true 表示将消息返回到生产者,否则直接丢弃消息。
immediate
参数告诉服务器至少将该消息路由到一个队列中,否则将消息返回给生产者。 imrnediate 参数告诉服务器,如果该消息关联的队列上有消费者,则立刻投递:如果所有匹配的队列上都没有消费者,则直接将消息返还给生产者,不用将消息存入队列而等待消费者了。
RabbitMQ 3.0版本开始去掉了对 imrnediate 参数的支持。
其中 amqp.Publishing 的 DeliveryMode 如果设为 amqp.Persistent 则消息会持久化。需要注意的是如果需要消息持久化 Queue 也是需要设定为持久化才有效。
3. 消费者流程
消费者的步骤和生产者流程基本类似,只是将生产者流程中的投递消息变为消费消息。
Rabbitmq 消费方式共有 2 种,分别是推模式和拉模式。
3.1 推模式
推模式是通过持续订阅的方式来消费信息, Consume 将信道( Channel )设置为接收模式,直到取消队列的订阅为止。在接收模式期间, RabbitMQ 会不断地推送消息给消费者。推送消息的个数还是会受到 channel.Qos 的限制。
deliveries, err := channel.Consume("q1", "any", false, false, false, true, nil)
参数说明:
queue 队列名称
consumer 消息者名称
autoAck 是否确认消费
exclusive 排他
noLocal
noWait bool
args Table
参数说明要点:
noLocal
设置为 true 则表示不能将同一个 Connection 中生产者发送的消息传送给这个 Connection 中的消费者
其中 autoAck 可以设置为 true 或者 false 。
如果设为 true 则消费者一接收到就从 queue 中去除了,如果消费者处理消息中发生意外该消息就丢失了。
如果设为 false 则消费者在处理完消息后,调用 msg.Ack(false) 后消息才从 queue 中去除。即便当前消费者处理该消息发生意外,只要没有执行 msg.Ack(false) 那该消息就仍然在 queue 中,不会丢失。
如果autoAck设置为 false 则表示需要手动进行 ack 消费
v, ok := <-deliveries if ok { // 手动ack确认 // 注意: 这里只要调用了ack就是手动确认模式, // v.Ack的参数 multiple 表示的是在此channel中先前所有未确认的deliveries都将被确认 // 并不是表示设置为false就不进行当前ack确认 if err := v.Ack(true); err != nil { fmt.Println(err.Error()) } } else { fmt.Println("Channel close") }
3.2 拉模式
相对来说比较简单,是由消费者主动拉取信息来消费,每次只消费一条消息,同样也需要进行 ack 确认消费。
channel.Get(queue string, autoAck bool)