1、RabbitMQ简介
1.1、RabbitMQ介绍
RabbitMQ 2007年发布,是一个在 AMQP (高级消息队列协议)基础上完成的,可复用的企业消息系统,是当前最主流的消息中间件之一。 RabbitMQ是同Erlang语言开发对于一般的使用场景而言是一个非常不错的选择。
1.2主流的MQ实现方式:AMQP、JMS
- JMS是定义了统一的接口,来对消息操作进行统一;AMQP是通过规定协议来统一数据交互的格式
- JMS限定了必须使用Java语言;AMQP只是协议,不规定实现方式,因此是跨语言的。
- JMS规定了两种消息模型;而AMQP的消息模型更加丰富
2、RabbitMQ快速配置
使用docker快速配置RabbitMQ
使用docker pull命令从官方仓库上下载RabbitMQ镜像,这里我们下一个带有management的镜像文件,由于我已经下了所以我的机器上是下面的样子,下完之后用docker images查看一下。
docker run -d --hostname my-rabbit --name rabbitmq -e RABBITMQ_DEFAULT_USER=user -e RABBITMQ_DEFAULT_PASS=password -e RABBITMQ_DEFAULT_VHOST=my-vhost -p 4369:4369 -p 5671:5671 -p 5672:5672 -p 25672:25672 -p 15691:15691 -p 15692:15692 -p 15671:15671 -p 15672:15672 rabbitmq:3-management
go get github.com/streadway/amqp
3、RabbitMQ简单使用
以下的代码中连接字符串里的user,password,vhost如果设置过的话就用设置过的账号,没有设置过的话默认是amqp://guest:guest@localhost:5672/
RabbitMQ最简单的工作模式
生产者端
package main
import (
"log"
"github.com/streadway/amqp"
)
func FailOnError(err error, msg string) {
if err != nil {
log.Printf("%s:%s", msg, err)
}
}
func main() {
conn, err := amqp.Dial("amqp://user:password@localhost:5672/vhost")
FailOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
//创建RabbitMQ中的管道
ch, err := conn.Channel()
FailOnError(err, "Failed to open a channel")
defer ch.Close()
//声明队列,如果队列不存在则自动创建,存在则跳过创建
q, err := ch.QueueDeclare(
"hello", //消息队列的名称
false, //是否持久化
false, //是否自动删除
false, //是否具有排他性(仅创建它的程序才可用)
false, //是否阻塞处理
nil, //额外的属性
)
FailOnError(err, "Failed to declare a queue")
body := "Hello World"
err = ch.Publish(
"",
q.Name,
//如果为true,根据自身exchange类型和routekey规则无法找到符合条件的队列会把消息返还给发送者
false,
//如果为true,当exchange发送消息到队列后发现队列上没有消费者,则会把消息返还给发送者
false,
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
},
)
log.Printf(" [x] Sent %s", body)
FailOnError(err, "Failed to publish a message")
}
消费者端
package main
import (
"log"
"github.com/streadway/amqp"
)
func FailOnError(err error, msg string) {
if err != nil {
log.Printf("%s:%s", msg, err)
}
}
func main() {
conn, err := amqp.Dial("amqp://user:password@localhost:5672/vhost")
FailOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
//创建RabbitMQ中的管道
ch, err := conn.Channel()
FailOnError(err, "Failed to open a channel")
defer ch.Close()
//为消息队列注册消费者
//接收消息
forever := make(chan bool)
msgs, err := ch.Consume(
"hello", // queue
//用来区分多个消费者
"", // consumer
//是否自动应答
true, // auto-ack
//是否独有
false, // exclusive
//设置为true,表示 不能将同一个Conenction中生产者发送的消息传递给这个Connection中 的消费者
false, // no-local
//列是否阻塞
false, // no-wait
nil, // args
)
FailOnError(err, "Failed to register a consumer")
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
}
}()
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever
}
4、RabbitMQ的四种工作模式
RabbitMQ的四种工作模式分别为:
- Direct Exchange(直连交换机):对于每个队列与direct交换机绑定的key进行完全匹配。
- Topic Exchange(主题交换机) :对于每个队列与Topic交换机绑定的key进行模糊匹配。
- Fanout Exchange(扇出型交换机): Fanout类型的交换机会将消息分发给所有绑定了此交换机的队列
- Headers Exchange(头交换机) :Headers类型的交换机是通过headers信息来匹配的,工作原理与direct类型类似。
4.1、direct工作模式
Direct Exchange(直连交换机):对于每个队列与direct交换机绑定的key进行完全匹配。
生产者端
package main
import (
"log"
"github.com/streadway/amqp"
)
func FailOnError(err error, msg string) {
if err != nil {
log.Printf("%s:%s", msg, err)
}
}
func main() {
conn, err := amqp.Dial("amqp://user:password@localhost:5672/vhost")
FailOnError(err, "Failed to Connected to RabbitMQ")
defer conn.Close()
//定义交换机的名称
exchangeName := "Direct_Exchange"
//定义队列的名称
queueNames := []string{"direct_Queue1", "direct_Queue2", "direct_Queue3", "direct_Queue4"}
//定义Key值
keys := []string{"key_1", "key_3", "key_4"}
//申请通道
ch, err := conn.Channel()
FailOnError(err, "Failed to open a channel")
defer ch.Close()
//声明队列
q1, err := ch.QueueDeclare(queueNames[0], true, false, false, false, nil)
FailOnError(err, "Failed to Create a Queue")
q2, err := ch.QueueDeclare(queueNames[1], true, false, false, false, nil)
FailOnError(err, "Failed to Create a Queue")
q3, err := ch.QueueDeclare(queueNames[2], true, false, false, false, nil)
FailOnError(err, "Failed to Create a Queue")
q4, err := ch.QueueDeclare(queueNames[3], true, false, false, false, nil)
FailOnError(err, "Failed to Create a Queue")
//声明交换机类型
err = ch.ExchangeDeclare(
exchangeName, //交换机的名称
//交换机的类型,分为:direct(直连),fanout(扇出,类似广播),topic(话题,与direct相似但是模式匹配),headers(用header来设置生产和消费的key)
"direct",
true, //是否持久化
false, //是否自动删除
false, //是否公开,false即公开
false, //是否等待
nil,
)
FailOnError(err, "Failed to Declare a Exchange")
//根据key将队列与keys绑定
ch.QueueBind(q1.Name, keys[0], exchangeName, false, nil)
ch.QueueBind(q2.Name, keys[0], exchangeName, false, nil)
ch.QueueBind(q3.Name, keys[1], exchangeName, false, nil)
ch.QueueBind(q4.Name, keys[2], exchangeName, false, nil)
//发送消息
err = ch.Publish(exchangeName, keys[0], false, false,
amqp.Publishing{
Type: "text/plain",
Body: []byte("Hello Dierct key1 message"),
},
)
log.Printf(" [x] Sent to %s : %s", exchangeName, "Hello Dierct key1 message")
FailOnError(err, "Failed to publish a message")
err = ch.Publish(exchangeName, keys[1], false, false,
amqp.Publishing{
Type: "text/plain",
Body: []byte("Hello Dierct key3 message"),
},
)
log.Printf(" [x] Sent to %s : %s", exchangeName, "Hello Dierct key3 message")
FailOnError(err, "Failed to publish a message")
err = ch.Publish(exchangeName, keys[2], false, false,
amqp.Publishing{
Type: "text/plain",
Body: []byte("Hello Dierct key4 message"),
},
)
log.Printf(" [x] Sent to %s : %s", exchangeName, "Hello Dierct key4 message")
FailOnError(err, "Failed to publish a message")
}
运行消费者程序后会发现四个队列都收到了消息,原因是direct_Queue1和direct_Queue2这两个队列绑定的都是key1所以交换机在收到routingkey为key1的消息就会同时发送给direct_Queue1和direct_Queue2。
消费者端
package main
import (
"log"
"github.com/streadway/amqp"
)
func FailOnError(err error, msg string) {
if err != nil {
log.Printf("%s:%s", msg, err)
}
}
func main() {
conn, err := amqp.Dial("amqp://user:password@localhost:5672/vhost")
FailOnError(err, "Failed to Connected to RabbitMQ")
defer conn.Close()
//定义队列的名称
queueNames := []string{"direct_Queue1", "direct_Queue2", "direct_Queue3", "direct_Queue4"}
//获取一个通道
ch, err := conn.Channel()
FailOnError(err, "Failed to Create a channel")
defer ch.Close()
//消费消息
forever := make(chan bool)
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
msgs, err := ch.Consume(queueNames[0], "", true, false, false, false, nil)
FailOnError(err, "Failed to register a consumer")
go func() {
for d := range msgs {
log.Printf("Received a message From %s : %s", queueNames[0], d.Body)
}
}()
msgs, err = ch.Consume(queueNames[1], "", true, false, false, false, nil)
FailOnError(err, "Failed to register a consumer")
go func() {
for d := range msgs {
log.Printf("Received a message From %s : %s", queueNames[1], d.Body)
}
}()
msgs, err = ch.Consume(queueNames[2], "", true, false, false, false, nil)
FailOnError(err, "Failed to register a consumer")
go func() {
for d := range msgs {
log.Printf("Received a message From %s : %s", queueNames[2], d.Body)
}
}()
msgs, err = ch.Consume(queueNames[3], "", true, false, false, false, nil)
FailOnError(err, "Failed to register a consumer")
go func() {
for d := range msgs {
log.Printf("Received a message From %s : %s", queueNames[3], d.Body)
}
}()
<-forever
}
4.2、topic工作模式
Topic Exchange(主题交换机) :对于每个队列与Topic交换机绑定的key进行模糊匹配。
匹配规则为:
- Topic中,将routingkey通过“.”来分为多个部分
- “*”:代表一个部分
- “#”:代表0个或多个部分(如果绑定的路由键为 “#” 时,则接受所有消息,因为路由键所有都匹配)
生产者端
package main
import (
"log"
"github.com/streadway/amqp"
)
func FailOnError(err error, msg string) {
if err != nil {
log.Printf("%s:%s", msg, err)
}
}
func main() {
//建立连接
conn, err := amqp.Dial("amqp://user:password@localhost/vhost")
FailOnError(err, "Failed to Connected to RabbitMQ")
defer conn.Close()
//定义交换机的名称
exchangeName := "Topic_Exchange"
//定义队列的名称
queueNames := []string{"topic_Queue1", "topic_Queue2", "topic_Queue3", "topic_Queue4"}
//定义keys
keys := []string{"key1.key2.key3.*", "key1.#", "*.key2.*.key4", "#.key3.key4"}
//申请通道
ch, err := conn.Channel()
FailOnError(err, "Failed to open a channel")
defer ch.Close()
//声明队列
q1, err := ch.QueueDeclare(queueNames[0], true, false, false, false, nil)
FailOnError(err, "Failed to declare a Queue")
q2, err := ch.QueueDeclare(queueNames[1], true, false, false, false, nil)
FailOnError(err, "Failed to declare a Queue")
q3, err := ch.QueueDeclare(queueNames[2], true, false, false, false, nil)
FailOnError(err, "Failed to declare a Queue")
q4, err := ch.QueueDeclare(queueNames[3], true, false, false, false, nil)
FailOnError(err, "Failed to declare a Queue")
//声明交换机
err = ch.ExchangeDeclare(exchangeName, "topic", true, false, false, false, nil)
FailOnError(err, "Failed to Declare a Exchange")
//将队列和key绑定到交换机上
ch.QueueBind(q1.Name, keys[0], exchangeName, false, nil)
ch.QueueBind(q2.Name, keys[1], exchangeName, false, nil)
ch.QueueBind(q3.Name, keys[2], exchangeName, false, nil)
ch.QueueBind(q4.Name, keys[3], exchangeName, false, nil)
//发送消息
err = ch.Publish(exchangeName, "key1.key2.key3.key4", false, false,
amqp.Publishing{
Type: "text/plain",
Body: []byte("Hello Topic key1 message"),
},
)
log.Printf(" [x] Sent to %s : %s", exchangeName, "Hello Topic key1 message")
FailOnError(err, "Failed to publish a message")
}
我们在生产者端向topic交换机以"key1"为关键字发送了一条消息后在后台管理处可以看到下面的结果
原因:
- 第一个队列匹配规则为“key1.key2.key3.*”,而我们发的routingkey是key1它不能匹配上后面的key2和key3。
- 第二个队列匹配规则为“key1.#”,key1匹配到我们发的routingkey后面的“#”表示0个或者多个部分,所以第二个队列是可以匹配到的。
- 第三个队列匹配规则为“*.key2.*.key4”,第一个“*”代表一个部分可以匹配到key1,但是key1并不能与后面的部分进行匹配。
- 第三个队列匹配规则为“#.key3.key4”,同样的第一个“#”可以匹配到key1,但是key1并不能与后面的规则进行匹配。
将发送消息中的routingkey从“key1”改为“key1.key2.key3.key4”后再发送一次消息得到如下结果,如图所示当改变routingkey之后发送的消息每个队列都有收到。
消费者端
package main
import (
"log"
"github.com/streadway/amqp"
)
func FailOnError(err error, msg string) {
if err != nil {
log.Printf("%s:%s", msg, err)
}
}
func main() {
conn, err := amqp.Dial("amqp://user:password@localhost:5672/vhost")
FailOnError(err, "Failed to Connected to RabbitMQ")
defer conn.Close()
//定义队列的名称
queueNames := []string{"topic_Queue1", "topic_Queue2", "topic_Queue3", "topic_Queue4"}
//申请通道
ch, err := conn.Channel()
FailOnError(err, "Failed to Open a Channel")
defer ch.Close()
//消费消息
forever := make(chan bool)
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
msgs, err := ch.Consume(queueNames[0], "", true, false, false, false, nil)
FailOnError(err, "Failed to register a consumer")
go func() {
for d := range msgs {
log.Printf("Received a message From %s : %s", queueNames[0], d.Body)
}
}()
msgs, err = ch.Consume(queueNames[1], "", true, false, false, false, nil)
FailOnError(err, "Failed to register a consumer")
go func() {
for d := range msgs {
log.Printf("Received a message From %s : %s", queueNames[1], d.Body)
}
}()
msgs, err = ch.Consume(queueNames[2], "", true, false, false, false, nil)
FailOnError(err, "Failed to register a consumer")
go func() {
for d := range msgs {
log.Printf("Received a message From %s : %s", queueNames[2], d.Body)
}
}()
msgs, err = ch.Consume(queueNames[3], "", true, false, false, false, nil)
FailOnError(err, "Failed to register a consumer")
go func() {
for d := range msgs {
log.Printf("Received a message From %s : %s", queueNames[3], d.Body)
}
}()
<-forever
}
4.3、fanout工作模式
Fanout类型的交换机会将消息分发给所有绑定了此交换机的队列,此时routingkey参数相当于无效
生产者端
package main
import (
"log"
"github.com/streadway/amqp"
)
func FailOnError(err error, msg string) {
if err != nil {
log.Printf("%s:%s", msg, err)
}
}
func main() {
//建立连接
conn, err := amqp.Dial("amqp://user:password@localhost/vhost")
FailOnError(err, "Failed to Connected to RabbitMQ")
defer conn.Close()
//定义交换机的名称
exchangeName := "Fanout_Exchange"
//定义队列的名称
queueNames := []string{"fanout_Queue1", "fanout_Queue2", "fanout_Queue3", "fanout_Queue4"}
//定义keys
keys := []string{"key1", "key2", "key3", "key4"}
//申请通道
ch, err := conn.Channel()
FailOnError(err, "Failed to open a channel")
defer ch.Close()
//声明队列
q1, err := ch.QueueDeclare(queueNames[0], true, false, false, false, nil)
FailOnError(err, "Failed to declare a Queue")
q2, err := ch.QueueDeclare(queueNames[1], true, false, false, false, nil)
FailOnError(err, "Failed to declare a Queue")
q3, err := ch.QueueDeclare(queueNames[2], true, false, false, false, nil)
FailOnError(err, "Failed to declare a Queue")
q4, err := ch.QueueDeclare(queueNames[3], true, false, false, false, nil)
FailOnError(err, "Failed to declare a Queue")
//声明交换机
err = ch.ExchangeDeclare(exchangeName, "fanout", true, false, false, false, nil)
FailOnError(err, "Failed to Declare a Exchange")
//将队列和key绑定到交换机上
ch.QueueBind(q1.Name, keys[0], exchangeName, false, nil)
ch.QueueBind(q2.Name, keys[1], exchangeName, false, nil)
ch.QueueBind(q3.Name, keys[2], exchangeName, false, nil)
ch.QueueBind(q4.Name, keys[3], exchangeName, false, nil)
//发送消息
err = ch.Publish(exchangeName, "key1", false, false,
amqp.Publishing{
Type: "text/plain",
Body: []byte("Hello Fanout key1 message"),
},
)
log.Printf(" [x] Sent to %s : %s", exchangeName, "Hello Fanout key1 message")
FailOnError(err, "Failed to publish a message")
}
发送消息后
消费者端
package main
import (
"log"
"github.com/streadway/amqp"
)
func FailOnError(err error, msg string) {
if err != nil {
log.Printf("%s:%s", msg, err)
}
}
func main() {
conn, err := amqp.Dial("amqp://user:password@localhost:5672/vhost")
FailOnError(err, "Failed to Connected to RabbitMQ")
defer conn.Close()
//定义队列的名称
queueNames := []string{"fanout_Queue1", "fanout_Queue2", "fanout_Queue3", "fanout_Queue4"}
//申请通道
ch, err := conn.Channel()
FailOnError(err, "Failed to Open a Channel")
defer ch.Close()
//消费消息
forever := make(chan bool)
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
msgs, err := ch.Consume(queueNames[0], "", true, false, false, false, nil)
FailOnError(err, "Failed to register a consumer")
go func() {
for d := range msgs {
log.Printf("Received a message From %s : %s", queueNames[0], d.Body)
}
}()
msgs, err = ch.Consume(queueNames[1], "", true, false, false, false, nil)
FailOnError(err, "Failed to register a consumer")
go func() {
for d := range msgs {
log.Printf("Received a message From %s : %s", queueNames[1], d.Body)
}
}()
msgs, err = ch.Consume(queueNames[2], "", true, false, false, false, nil)
FailOnError(err, "Failed to register a consumer")
go func() {
for d := range msgs {
log.Printf("Received a message From %s : %s", queueNames[2], d.Body)
}
}()
msgs, err = ch.Consume(queueNames[3], "", true, false, false, false, nil)
FailOnError(err, "Failed to register a consumer")
go func() {
for d := range msgs {
log.Printf("Received a message From %s : %s", queueNames[3], d.Body)
}
}()
<-forever
}
4.4、headers工作模式
headers匹配AMQP消息的header而不是路由键,此外headers交换机和direct交换机完全一致,但性能差很多,目前几乎用不到了
消费方指定的headers中必须指定一个"x-match"的键
键"x-match"的值只有2个
- x-match=all:表示所有的键值对都匹配才能接收到消息
- x-match=any:表示只要键值对匹配就能接收消息
生产者端
package main
import (
"log"
"github.com/streadway/amqp"
)
func FailOnError(err error, msg string) {
if err != nil {
log.Printf("%s:%s", msg, err)
}
}
func main() {
//建立连接
conn, err := amqp.Dial("amqp://user:password@localhost/vhost")
FailOnError(err, "Failed to Connected to RabbitMQ")
defer conn.Close()
//定义交换机的名称
exchangeName := "Headers_Exchange"
//定义队列的名称
queueName := "Headers_Queue1"
//申请通道
ch, err := conn.Channel()
FailOnError(err, "Failed to open a channel")
defer ch.Close()
//声明队列
_, err = ch.QueueDeclare(queueName, true, false, false, false, nil)
FailOnError(err, "Failed to declare a Queue")
//声明交换机
err = ch.ExchangeDeclare(exchangeName, "headers", true, false, false, false, nil)
FailOnError(err, "Failed to Declare a Exchange")
//绑定交换机与队列的headers对应关系
headers := make(map[string]interface{})
headers["x-match"] = "all"
headers["name"] = "zhangsan"
headers["sex"] = "男"
//最后一个参数绑定headers
ch.QueueBind(queueName, "", exchangeName, false, headers)
//发送消息
err = ch.Publish(exchangeName, "", false, false,
amqp.Publishing{
//设置发送消息时的headers信息
Headers: amqp.Table{
"name": "zhangsan",
"sex": "男",
}, //这里把刚才定义的headers添加进来
Type: "text/plain",
Body: []byte("Hello ALL Headers message"),
},
)
log.Printf(" [x] Sent to %s : %s", exchangeName, "Hello ALL Headers message")
FailOnError(err, "Failed to publish a message")
}
运行消费者端后可以看到创建了一个headers类型的交换机和一个队列并且可以看到两者之间的绑定关系,示例代码中发送消息的headers和绑定的headers相同所以Headers_Exchange队列会收到一条消息,但是如果两者是"x-match"值是all且其他参数不相同消息是不会通过交换机送到Headers_Exchange队列的,当“x-match”的值为any时的效果是只要其他参数中有一样的就匹配,读者可以自行尝试。
消费者端
package main
import (
"log"
"github.com/streadway/amqp"
)
func FailOnError(err error, msg string) {
if err != nil {
log.Printf("%s:%s", msg, err)
}
}
func main() {
conn, err := amqp.Dial("amqp://user:password@localhost:5672/vhost")
FailOnError(err, "Failed to Connected to RabbitMQ")
defer conn.Close()
//定义队列的名称
queueName := "Headers_Queue1"
//申请通道
ch, err := conn.Channel()
FailOnError(err, "Failed to Open a Channel")
defer ch.Close()
//消费消息
forever := make(chan bool)
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
msgs, err := ch.Consume(queueName, "", true, false, false, false, nil)
FailOnError(err, "Failed to register a consumer")
go func() {
for d := range msgs {
log.Printf("Received a message From %s : %s", queueName, d.Body)
}
}()
<-forever
}