Golang使用RabbitMQ

1、RabbitMQ简介

1.1、RabbitMQ介绍

​ RabbitMQ 2007年发布,是一个在 AMQP (高级消息队列协议)基础上完成的,可复用的企业消息系统,是当前最主流的消息中间件之一。 RabbitMQ是同Erlang语言开发对于一般的使用场景而言是一个非常不错的选择。

1.2主流的MQ实现方式:AMQP、JMS

  • JMS是定义了统一的接口,来对消息操作进行统一;AMQP是通过规定协议来统一数据交互的格式
  • JMS限定了必须使用Java语言;AMQP只是协议,不规定实现方式,因此是跨语言的。
  • JMS规定了两种消息模型;而AMQP的消息模型更加丰富

2、RabbitMQ快速配置

使用docker快速配置RabbitMQ

使用docker pull命令从官方仓库上下载RabbitMQ镜像,这里我们下一个带有management的镜像文件,由于我已经下了所以我的机器上是下面的样子,下完之后用docker images查看一下。

docker run -d --hostname my-rabbit --name rabbitmq -e RABBITMQ_DEFAULT_USER=user -e RABBITMQ_DEFAULT_PASS=password -e RABBITMQ_DEFAULT_VHOST=my-vhost -p 4369:4369 -p 5671:5671 -p 5672:5672 -p 25672:25672 -p 15691:15691 -p 15692:15692 -p 15671:15671 -p 15672:15672 rabbitmq:3-management
go get github.com/streadway/amqp

3、RabbitMQ简单使用

以下的代码中连接字符串里的user,password,vhost如果设置过的话就用设置过的账号,没有设置过的话默认是amqp://guest:guest@localhost:5672/
RabbitMQ最简单的工作模式

生产者端

package main

import (
	"log"

	"github.com/streadway/amqp"
)

func FailOnError(err error, msg string) {
	if err != nil {
		log.Printf("%s:%s", msg, err)
	}
}
func main() {
	conn, err := amqp.Dial("amqp://user:password@localhost:5672/vhost")
	FailOnError(err, "Failed to connect to RabbitMQ")
	defer conn.Close()

	//创建RabbitMQ中的管道
	ch, err := conn.Channel()
	FailOnError(err, "Failed to open a channel")
	defer ch.Close()

	//声明队列,如果队列不存在则自动创建,存在则跳过创建
	q, err := ch.QueueDeclare(
		"hello", //消息队列的名称
		false,   //是否持久化
		false,   //是否自动删除
		false,   //是否具有排他性(仅创建它的程序才可用)
		false,   //是否阻塞处理
		nil,     //额外的属性
	)
	FailOnError(err, "Failed to declare a queue")

	body := "Hello World"
	err = ch.Publish(
		"",
		q.Name,
		//如果为true,根据自身exchange类型和routekey规则无法找到符合条件的队列会把消息返还给发送者
		false,
		//如果为true,当exchange发送消息到队列后发现队列上没有消费者,则会把消息返还给发送者
		false,
		amqp.Publishing{
			ContentType: "text/plain",
			Body:        []byte(body),
		},
	)
	log.Printf(" [x] Sent %s", body)
	FailOnError(err, "Failed to publish a message")
}

消费者端

package main

import (
	"log"

	"github.com/streadway/amqp"
)

func FailOnError(err error, msg string) {
	if err != nil {
		log.Printf("%s:%s", msg, err)
	}
}
func main() {
	conn, err := amqp.Dial("amqp://user:password@localhost:5672/vhost")
	FailOnError(err, "Failed to connect to RabbitMQ")
	defer conn.Close()

	//创建RabbitMQ中的管道
	ch, err := conn.Channel()
	FailOnError(err, "Failed to open a channel")
	defer ch.Close()

	//为消息队列注册消费者
	//接收消息
	forever := make(chan bool)
	msgs, err := ch.Consume(
		"hello", // queue
		//用来区分多个消费者
		"", // consumer
		//是否自动应答
		true, // auto-ack
		//是否独有
		false, // exclusive
		//设置为true,表示 不能将同一个Conenction中生产者发送的消息传递给这个Connection中 的消费者
		false, // no-local
		//列是否阻塞
		false, // no-wait
		nil,   // args
	)
	FailOnError(err, "Failed to register a consumer")
	go func() {
		for d := range msgs {
			log.Printf("Received a message: %s", d.Body)
		}
	}()

	log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
	<-forever
}

4、RabbitMQ的四种工作模式

​ RabbitMQ的四种工作模式分别为:

  1. Direct Exchange(直连交换机):对于每个队列与direct交换机绑定的key进行完全匹配。
  2. Topic Exchange(主题交换机) :对于每个队列与Topic交换机绑定的key进行模糊匹配。
  3. Fanout Exchange(扇出型交换机): Fanout类型的交换机会将消息分发给所有绑定了此交换机的队列
  4. Headers Exchange(头交换机) :Headers类型的交换机是通过headers信息来匹配的,工作原理与direct类型类似。

4.1、direct工作模式

Direct Exchange(直连交换机):对于每个队列与direct交换机绑定的key进行完全匹配。

生产者端

package main

import (
	"log"

	"github.com/streadway/amqp"
)

func FailOnError(err error, msg string) {
	if err != nil {
		log.Printf("%s:%s", msg, err)
	}
}
func main() {
	conn, err := amqp.Dial("amqp://user:password@localhost:5672/vhost")
	FailOnError(err, "Failed to Connected to RabbitMQ")
	defer conn.Close()
	//定义交换机的名称
	exchangeName := "Direct_Exchange"
	//定义队列的名称
	queueNames := []string{"direct_Queue1", "direct_Queue2", "direct_Queue3", "direct_Queue4"}
	//定义Key值
	keys := []string{"key_1", "key_3", "key_4"}

	//申请通道
	ch, err := conn.Channel()
	FailOnError(err, "Failed to open a channel")
	defer ch.Close()
	//声明队列
	q1, err := ch.QueueDeclare(queueNames[0], true, false, false, false, nil)
	FailOnError(err, "Failed to Create a Queue")
	q2, err := ch.QueueDeclare(queueNames[1], true, false, false, false, nil)
	FailOnError(err, "Failed to Create a Queue")
	q3, err := ch.QueueDeclare(queueNames[2], true, false, false, false, nil)
	FailOnError(err, "Failed to Create a Queue")
	q4, err := ch.QueueDeclare(queueNames[3], true, false, false, false, nil)
	FailOnError(err, "Failed to Create a Queue")

	//声明交换机类型
	err = ch.ExchangeDeclare(
		exchangeName, //交换机的名称
		//交换机的类型,分为:direct(直连),fanout(扇出,类似广播),topic(话题,与direct相似但是模式匹配),headers(用header来设置生产和消费的key)
		"direct",
		true,  //是否持久化
		false, //是否自动删除
		false, //是否公开,false即公开
		false, //是否等待
		nil,
	)
	FailOnError(err, "Failed to Declare a Exchange")

	//根据key将队列与keys绑定
	ch.QueueBind(q1.Name, keys[0], exchangeName, false, nil)
	ch.QueueBind(q2.Name, keys[0], exchangeName, false, nil)
	ch.QueueBind(q3.Name, keys[1], exchangeName, false, nil)
	ch.QueueBind(q4.Name, keys[2], exchangeName, false, nil)

	//发送消息
	err = ch.Publish(exchangeName, keys[0], false, false,
		amqp.Publishing{
			Type: "text/plain",
			Body: []byte("Hello Dierct key1 message"),
		},
	)
	log.Printf(" [x] Sent to %s : %s", exchangeName, "Hello Dierct key1 message")
	FailOnError(err, "Failed to publish a message")

	err = ch.Publish(exchangeName, keys[1], false, false,
		amqp.Publishing{
			Type: "text/plain",
			Body: []byte("Hello Dierct key3 message"),
		},
	)
	log.Printf(" [x] Sent to %s : %s", exchangeName, "Hello Dierct key3 message")
	FailOnError(err, "Failed to publish a message")

	err = ch.Publish(exchangeName, keys[2], false, false,
		amqp.Publishing{
			Type: "text/plain",
			Body: []byte("Hello Dierct key4 message"),
		},
	)
	log.Printf(" [x] Sent to %s : %s", exchangeName, "Hello Dierct key4 message")
	FailOnError(err, "Failed to publish a message")
}

​ 运行消费者程序后会发现四个队列都收到了消息,原因是direct_Queue1和direct_Queue2这两个队列绑定的都是key1所以交换机在收到routingkey为key1的消息就会同时发送给direct_Queue1和direct_Queue2。

消费者端

package main

import (
	"log"

	"github.com/streadway/amqp"
)

func FailOnError(err error, msg string) {
	if err != nil {
		log.Printf("%s:%s", msg, err)
	}
}
func main() {
	conn, err := amqp.Dial("amqp://user:password@localhost:5672/vhost")
	FailOnError(err, "Failed to Connected to RabbitMQ")
	defer conn.Close()

	//定义队列的名称
	queueNames := []string{"direct_Queue1", "direct_Queue2", "direct_Queue3", "direct_Queue4"}

	//获取一个通道
	ch, err := conn.Channel()
	FailOnError(err, "Failed to Create a channel")
	defer ch.Close()

	//消费消息
	forever := make(chan bool)
	log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
	msgs, err := ch.Consume(queueNames[0], "", true, false, false, false, nil)
	FailOnError(err, "Failed to register a consumer")

	go func() {
		for d := range msgs {
			log.Printf("Received a message From %s : %s", queueNames[0], d.Body)
		}
	}()

	msgs, err = ch.Consume(queueNames[1], "", true, false, false, false, nil)
	FailOnError(err, "Failed to register a consumer")

	go func() {
		for d := range msgs {
			log.Printf("Received a message From %s : %s", queueNames[1], d.Body)
		}
	}()

	msgs, err = ch.Consume(queueNames[2], "", true, false, false, false, nil)
	FailOnError(err, "Failed to register a consumer")

	go func() {
		for d := range msgs {
			log.Printf("Received a message From %s : %s", queueNames[2], d.Body)
		}
	}()

	msgs, err = ch.Consume(queueNames[3], "", true, false, false, false, nil)
	FailOnError(err, "Failed to register a consumer")

	go func() {
		for d := range msgs {
			log.Printf("Received a message From %s : %s", queueNames[3], d.Body)
		}
	}()
	<-forever
}

4.2、topic工作模式

Topic Exchange(主题交换机) :对于每个队列与Topic交换机绑定的key进行模糊匹配。

匹配规则为:

  1. Topic中,将routingkey通过“.”来分为多个部分
  2. “*”:代表一个部分
  3. “#”:代表0个或多个部分(如果绑定的路由键为 “#” 时,则接受所有消息,因为路由键所有都匹配)

生产者端

package main

import (
	"log"

	"github.com/streadway/amqp"
)

func FailOnError(err error, msg string) {
	if err != nil {
		log.Printf("%s:%s", msg, err)
	}
}
func main() {
	//建立连接
	conn, err := amqp.Dial("amqp://user:password@localhost/vhost")
	FailOnError(err, "Failed to Connected to RabbitMQ")
	defer conn.Close()

	//定义交换机的名称
	exchangeName := "Topic_Exchange"
	//定义队列的名称
	queueNames := []string{"topic_Queue1", "topic_Queue2", "topic_Queue3", "topic_Queue4"}
	//定义keys
	keys := []string{"key1.key2.key3.*", "key1.#", "*.key2.*.key4", "#.key3.key4"}

	//申请通道
	ch, err := conn.Channel()
	FailOnError(err, "Failed to open a channel")
	defer ch.Close()

	//声明队列
	q1, err := ch.QueueDeclare(queueNames[0], true, false, false, false, nil)
	FailOnError(err, "Failed to declare a Queue")
	q2, err := ch.QueueDeclare(queueNames[1], true, false, false, false, nil)
	FailOnError(err, "Failed to declare a Queue")
	q3, err := ch.QueueDeclare(queueNames[2], true, false, false, false, nil)
	FailOnError(err, "Failed to declare a Queue")
	q4, err := ch.QueueDeclare(queueNames[3], true, false, false, false, nil)
	FailOnError(err, "Failed to declare a Queue")

	//声明交换机
	err = ch.ExchangeDeclare(exchangeName, "topic", true, false, false, false, nil)
	FailOnError(err, "Failed to Declare a Exchange")

	//将队列和key绑定到交换机上
	ch.QueueBind(q1.Name, keys[0], exchangeName, false, nil)
	ch.QueueBind(q2.Name, keys[1], exchangeName, false, nil)
	ch.QueueBind(q3.Name, keys[2], exchangeName, false, nil)
	ch.QueueBind(q4.Name, keys[3], exchangeName, false, nil)

	//发送消息
	err = ch.Publish(exchangeName, "key1.key2.key3.key4", false, false,
		amqp.Publishing{
			Type: "text/plain",
			Body: []byte("Hello Topic key1 message"),
		},
	)
	log.Printf(" [x] Sent to %s : %s", exchangeName, "Hello Topic key1 message")
	FailOnError(err, "Failed to publish a message")
}

​ 我们在生产者端向topic交换机以"key1"为关键字发送了一条消息后在后台管理处可以看到下面的结果

原因:

  1. 第一个队列匹配规则为“key1.key2.key3.*”,而我们发的routingkey是key1它不能匹配上后面的key2和key3。
  2. 第二个队列匹配规则为“key1.#”,key1匹配到我们发的routingkey后面的“#”表示0个或者多个部分,所以第二个队列是可以匹配到的。
  3. 第三个队列匹配规则为“*.key2.*.key4”,第一个“*”代表一个部分可以匹配到key1,但是key1并不能与后面的部分进行匹配。
  4. 第三个队列匹配规则为“#.key3.key4”,同样的第一个“#”可以匹配到key1,但是key1并不能与后面的规则进行匹配。

将发送消息中的routingkey从“key1”改为“key1.key2.key3.key4”后再发送一次消息得到如下结果,如图所示当改变routingkey之后发送的消息每个队列都有收到。

消费者端

package main

import (
	"log"

	"github.com/streadway/amqp"
)

func FailOnError(err error, msg string) {
	if err != nil {
		log.Printf("%s:%s", msg, err)
	}
}
func main() {
	conn, err := amqp.Dial("amqp://user:password@localhost:5672/vhost")
	FailOnError(err, "Failed to Connected to RabbitMQ")
	defer conn.Close()

	//定义队列的名称
	queueNames := []string{"topic_Queue1", "topic_Queue2", "topic_Queue3", "topic_Queue4"}

	//申请通道
	ch, err := conn.Channel()
	FailOnError(err, "Failed to Open a Channel")
	defer ch.Close()

	//消费消息
	forever := make(chan bool)
	log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
	msgs, err := ch.Consume(queueNames[0], "", true, false, false, false, nil)
	FailOnError(err, "Failed to register a consumer")

	go func() {
		for d := range msgs {
			log.Printf("Received a message From %s : %s", queueNames[0], d.Body)
		}
	}()

	msgs, err = ch.Consume(queueNames[1], "", true, false, false, false, nil)
	FailOnError(err, "Failed to register a consumer")

	go func() {
		for d := range msgs {
			log.Printf("Received a message From %s : %s", queueNames[1], d.Body)
		}
	}()

	msgs, err = ch.Consume(queueNames[2], "", true, false, false, false, nil)
	FailOnError(err, "Failed to register a consumer")

	go func() {
		for d := range msgs {
			log.Printf("Received a message From %s : %s", queueNames[2], d.Body)
		}
	}()

	msgs, err = ch.Consume(queueNames[3], "", true, false, false, false, nil)
	FailOnError(err, "Failed to register a consumer")

	go func() {
		for d := range msgs {
			log.Printf("Received a message From %s : %s", queueNames[3], d.Body)
		}
	}()
	<-forever
}

4.3、fanout工作模式

Fanout类型的交换机会将消息分发给所有绑定了此交换机的队列,此时routingkey参数相当于无效

生产者端

package main

import (
	"log"

	"github.com/streadway/amqp"
)

func FailOnError(err error, msg string) {
	if err != nil {
		log.Printf("%s:%s", msg, err)
	}
}
func main() {
	//建立连接
	conn, err := amqp.Dial("amqp://user:password@localhost/vhost")
	FailOnError(err, "Failed to Connected to RabbitMQ")
	defer conn.Close()

	//定义交换机的名称
	exchangeName := "Fanout_Exchange"
	//定义队列的名称
	queueNames := []string{"fanout_Queue1", "fanout_Queue2", "fanout_Queue3", "fanout_Queue4"}
	//定义keys
	keys := []string{"key1", "key2", "key3", "key4"}

	//申请通道
	ch, err := conn.Channel()
	FailOnError(err, "Failed to open a channel")
	defer ch.Close()

	//声明队列
	q1, err := ch.QueueDeclare(queueNames[0], true, false, false, false, nil)
	FailOnError(err, "Failed to declare a Queue")
	q2, err := ch.QueueDeclare(queueNames[1], true, false, false, false, nil)
	FailOnError(err, "Failed to declare a Queue")
	q3, err := ch.QueueDeclare(queueNames[2], true, false, false, false, nil)
	FailOnError(err, "Failed to declare a Queue")
	q4, err := ch.QueueDeclare(queueNames[3], true, false, false, false, nil)
	FailOnError(err, "Failed to declare a Queue")

	//声明交换机
	err = ch.ExchangeDeclare(exchangeName, "fanout", true, false, false, false, nil)
	FailOnError(err, "Failed to Declare a Exchange")

	//将队列和key绑定到交换机上
	ch.QueueBind(q1.Name, keys[0], exchangeName, false, nil)
	ch.QueueBind(q2.Name, keys[1], exchangeName, false, nil)
	ch.QueueBind(q3.Name, keys[2], exchangeName, false, nil)
	ch.QueueBind(q4.Name, keys[3], exchangeName, false, nil)

	//发送消息
	err = ch.Publish(exchangeName, "key1", false, false,
		amqp.Publishing{
			Type: "text/plain",
			Body: []byte("Hello Fanout key1 message"),
		},
	)
	log.Printf(" [x] Sent to %s : %s", exchangeName, "Hello Fanout key1 message")
	FailOnError(err, "Failed to publish a message")
}

​ 发送消息后

消费者端

package main

import (
	"log"

	"github.com/streadway/amqp"
)

func FailOnError(err error, msg string) {
	if err != nil {
		log.Printf("%s:%s", msg, err)
	}
}
func main() {
	conn, err := amqp.Dial("amqp://user:password@localhost:5672/vhost")
	FailOnError(err, "Failed to Connected to RabbitMQ")
	defer conn.Close()

	//定义队列的名称
	queueNames := []string{"fanout_Queue1", "fanout_Queue2", "fanout_Queue3", "fanout_Queue4"}

	//申请通道
	ch, err := conn.Channel()
	FailOnError(err, "Failed to Open a Channel")
	defer ch.Close()

	//消费消息
	forever := make(chan bool)
	log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
	msgs, err := ch.Consume(queueNames[0], "", true, false, false, false, nil)
	FailOnError(err, "Failed to register a consumer")

	go func() {
		for d := range msgs {
			log.Printf("Received a message From %s : %s", queueNames[0], d.Body)
		}
	}()

	msgs, err = ch.Consume(queueNames[1], "", true, false, false, false, nil)
	FailOnError(err, "Failed to register a consumer")

	go func() {
		for d := range msgs {
			log.Printf("Received a message From %s : %s", queueNames[1], d.Body)
		}
	}()

	msgs, err = ch.Consume(queueNames[2], "", true, false, false, false, nil)
	FailOnError(err, "Failed to register a consumer")

	go func() {
		for d := range msgs {
			log.Printf("Received a message From %s : %s", queueNames[2], d.Body)
		}
	}()

	msgs, err = ch.Consume(queueNames[3], "", true, false, false, false, nil)
	FailOnError(err, "Failed to register a consumer")

	go func() {
		for d := range msgs {
			log.Printf("Received a message From %s : %s", queueNames[3], d.Body)
		}
	}()
	<-forever
}

4.4、headers工作模式

​ headers匹配AMQP消息的header而不是路由键,此外headers交换机和direct交换机完全一致,但性能差很多,目前几乎用不到了

消费方指定的headers中必须指定一个"x-match"的键

键"x-match"的值只有2个

  1. x-match=all:表示所有的键值对都匹配才能接收到消息
  2. x-match=any:表示只要键值对匹配就能接收消息

生产者端

package main

import (
	"log"

	"github.com/streadway/amqp"
)

func FailOnError(err error, msg string) {
	if err != nil {
		log.Printf("%s:%s", msg, err)
	}
}
func main() {
	//建立连接
	conn, err := amqp.Dial("amqp://user:password@localhost/vhost")
	FailOnError(err, "Failed to Connected to RabbitMQ")
	defer conn.Close()

	//定义交换机的名称
	exchangeName := "Headers_Exchange"
	//定义队列的名称
	queueName := "Headers_Queue1"

	//申请通道
	ch, err := conn.Channel()
	FailOnError(err, "Failed to open a channel")
	defer ch.Close()

	//声明队列
	_, err = ch.QueueDeclare(queueName, true, false, false, false, nil)
	FailOnError(err, "Failed to declare a Queue")

	//声明交换机
	err = ch.ExchangeDeclare(exchangeName, "headers", true, false, false, false, nil)
	FailOnError(err, "Failed to Declare a Exchange")

	//绑定交换机与队列的headers对应关系
	headers := make(map[string]interface{})
	headers["x-match"] = "all"
	headers["name"] = "zhangsan"
	headers["sex"] = "男"
	//最后一个参数绑定headers
	ch.QueueBind(queueName, "", exchangeName, false, headers)

	//发送消息
	err = ch.Publish(exchangeName, "", false, false,
		amqp.Publishing{
			//设置发送消息时的headers信息
			Headers: amqp.Table{
				"name": "zhangsan",
				"sex":  "男",
			}, //这里把刚才定义的headers添加进来
			Type: "text/plain",
			Body: []byte("Hello ALL Headers message"),
		},
	)
	log.Printf(" [x] Sent to %s : %s", exchangeName, "Hello ALL Headers message")
	FailOnError(err, "Failed to publish a message")
}

​ 运行消费者端后可以看到创建了一个headers类型的交换机和一个队列并且可以看到两者之间的绑定关系,示例代码中发送消息的headers和绑定的headers相同所以Headers_Exchange队列会收到一条消息,但是如果两者是"x-match"值是all且其他参数不相同消息是不会通过交换机送到Headers_Exchange队列的,当“x-match”的值为any时的效果是只要其他参数中有一样的就匹配,读者可以自行尝试。

消费者端

package main

import (
	"log"

	"github.com/streadway/amqp"
)

func FailOnError(err error, msg string) {
	if err != nil {
		log.Printf("%s:%s", msg, err)
	}
}
func main() {
	conn, err := amqp.Dial("amqp://user:password@localhost:5672/vhost")
	FailOnError(err, "Failed to Connected to RabbitMQ")
	defer conn.Close()
	//定义队列的名称
	queueName := "Headers_Queue1"

	//申请通道
	ch, err := conn.Channel()
	FailOnError(err, "Failed to Open a Channel")
	defer ch.Close()

	//消费消息
	forever := make(chan bool)
	log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
	msgs, err := ch.Consume(queueName, "", true, false, false, false, nil)

	FailOnError(err, "Failed to register a consumer")

	go func() {
		for d := range msgs {
			log.Printf("Received a message From %s : %s", queueName, d.Body)
		}
	}()
	<-forever
}