写在前面

本文是使用Go语言实现各种RabbitMQ的中间件模型

1. 介绍

1.1 什么是MQ

MQ消息队列生产者消费者
消息中间件

目前市面上有很多消息中间件:RabbitMQ,RocketMQ,Kafka等等…

1.2 什么是RabbitMQ

RabbitMQAMQP协议

1.3 AMQP 协议

AMQP(advanced message queuing protocol)

顾名思义,AMQP是一种协议,更准确的说是一种binary wire-level protocol(链接协议)。这是其和JMS的本质差别,AMQP不从API层进行限定,而是直接定义网络交换的数据格式。这使得实现了AMQP的provider天然性就是跨平台的。以下是AMQP协议模型:
在这里插入图片描述

2. Go语言操作RabbitMQ

2.1 下载

下载rabbitmq过程就省了,可以直接到官网网站下载安装,像安装qq一样。

2.2 引入驱动

  • 驱动
go get github.com/streadway/amqp
  • 连接
var MQ *amqp.Connection

// RabbitMQ 链接
func RabbitMQ(connString string) {
	conn, err := amqp.Dial(connString)
	if err != nil {
		panic(err)
	}
	MQ = conn
}

2.3 HelloWorld 模型

在这里插入图片描述
P代表生产者,C代表消费者,红色部分是队列。
生产者生成消息到队列中,消费者进行消费,直连单点模式。

2.3.1 生产者

  • 声明连接对象
var ProductMQ *amqp.Connection
  • 声明通道
ch, err := ProductMQ.Channel()
  • 创建队列
q, err := ch.QueueDeclare("hello", 	// 队列名字
	false,   	// 是否持久化,
	false,   // 不用的时候是否自动删除
	false,   	// 用来指定是否独占队列
	false,   	// no-wait
	nil,     		// 其他参数
)

参数1(name):队列名字
参数2(durable):持久化,队列中所有的数据都是在内存中的,如果为true的话,这个通道关闭之后,数据就会存在磁盘中持久化,false的话就会丢弃
参数3(autoDelete):不需要用到队列的时候,是否将消息删除
参数4(exclusive):是否独占队列,true的话,就是只能是这个进程独占这个队列,其他都不能对这个队列进行读写
参数5(noWait):是否阻塞
参数6(args):其他参数

  • 发布消息
body := "Hello World!"

err = ch.Publish(
	"",     // 交换机
	q.Name, // 队列名字
	false,  // 是否强制性
	// 当mandatory标志位设置为true时,如果exchange根据自身类型和消息routeKey无法找到一个符合条件的queue,那么会调用basic.return方法将消息返回给生产者
	// 当mandatory设置为false时,出现上述情形broker会直接将消息扔掉
	false, //当immediate标志位设置为true时,如果exchange在将消息路由到queue(s)时发现对于的queue上么有消费者,那么这条消息不会放入队列中。当与消息routeKey关联的所有queue(一个或者多个)都没有消费者时,该消息会通过basic.return方法返还给生产者
	// 是否立刻
	/**
	概括来说,mandatory标志告诉服务器至少将该消息route到一个队列中,否则将消息返还给生产者;immediate标志告诉服务器如果该消息关联的queue上有消费者,则马上将消息投递给它,如果所有queue都没有消费者,直接把消息返还给生产者,不用将消息入队列等待消费者了。
	**/
	amqp.Publishing{
		ContentType: "text/plain",
		Body:        []byte(body), // 发送的消息
	})

参数1(exchange):交换机,后续会讲到
参数2(route-key):队列名字
参数3(mandatory):是否强制性,

basic.returnbroker会直接将消息扔掉

参数4(immediate):是否立即处理

当immediate标志位设置为true时,如果exchange在将消息路由到queue(s)时发现对于的queue上么有消费者,那么这条消息不会放入队列中。当与消息routeKey关联的所有queue(一个或者多个)都没有消费者时,该消息会通过basic.return方法返还给生产者

也就是说,mandatory 标志告诉服务器至少将该消息route到一个队列中,否则将消息返还给生产者;immediate标志告诉服务器如果该消息关联的queue上有消费者,则马上将消息投递给它,如果所有queue都没有消费者,直接把消息返还给生产者,不用将消息入队列等待消费者了。

参数5(msg):发布的消息,ContentType是传输类型,Body是发送的消息。

2.3.2 消费者

  • 声明通道
ch, err := ConsumerMQ.Channel()
  • 创建队列
q, err := ch.QueueDeclare(
	"hello",
	false,
	false,
	false,
	false,
	nil,
)
  • 读取队列消息
msgs, err := ch.Consume(
	q.Name,
	"",    
	true,  
	false,  
	false,  
	false, 
	nil,    
)

由于消费者端需要一直监听,所以我们要用一个for循环+channel去阻塞主进程,使得主进程一直处于监听状态。

forever := make(chan bool)
go func() {
	for d := range msgs {
		fmt.Printf("Received a message: %s", d.Body)
	}
}()
fmt.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever

2.3.3 结果

  • 生产者

在这里插入图片描述

  • 消费者

在这里插入图片描述

2.4 Work Queues 模型

在这里插入图片描述

Work queuesTask queues

此时就可以使用work queues模型:让多个消费者绑定到一个队列,共同消费队列中的消息。队列中的消息一旦消费,就会消失,因此任务是不会被重复执行的。

2.4.1 生产者

生成10条消息到队列中

body := "Hello World!  "
for i := 0; i < 10; i++ {
	msg := strconv.Itoa(i)
	err = ch.Publish(
		"",     // 交换机
		q.Name, // 队列名字
		false,  // 是否强制性
		false,  // 是否立刻
		amqp.Publishing{
			ContentType: "text/plain",
			Body:        []byte(body+msg), // 发送的消息
		})
}

2.4.2 消费者

2.3.2

2.4.3 结果

消费者1号
在这里插入图片描述
消费者2号
在这里插入图片描述

2.5 Publish/Subscribe 模型

fanout 扇出 也称为广播

在这里插入图片描述

在广播模式下,消息发送流程如下:

  • 可以有多个消费者
  • 每个消费者有自己的queue(队列)
  • 每个队列都要绑定到Exchange(交换机)
  • 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定。
  • 交换机把消息发送给绑定过的所有队列
  • 队列的消费者都能拿到消息。实现一条消息被多个消费者消费

2.5.1 生产者

  • 声明交换机
_ = ch.ExchangeDeclare("logs", "fanout", true, false, false, false, nil)

参数1(name):交换机名称
参数2(kind):交换机类型

  • 生产消息
_ = ch.Publish("logs", "", false, false,
	amqp.Publishing{
		ContentType: "text/plain",
		Body:        []byte(body),
	})

2.5.2 消费者

  • 声明交换机
_ = ch.ExchangeDeclare("logs", "fanout", true, false, false, false, nil, )
  • 声明队列
q, _ := ch.QueueDeclare("", false, false, true, false, nil, )
  • 绑定交换机
_ = ch.QueueBind(q.Name, "", "logs", false, nil, )
  • 消费消息
msgs, _ := ch.Consume(q.Name, "", true, false, false, false, nil, )

2.5.3 结果

  • 生产者

在这里插入图片描述

  • 消费者

在这里插入图片描述

2.6 Routing 模型

在这里插入图片描述

  • P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key。
  • X:Exchange(交换机),接收生产者的消息,然后把消息递交给 与routing key完全匹配的队列
  • C1:消费者,其所在队列指定了需要routing key 为 error 的消息
  • C2:消费者,其所在队列指定了需要routing key 为 info、error、warning 的消息
fanoutDirect

在Direct模型下:

RoutingKeyRoutingKeyRouting KeyRoutingkeyRouting key

2.6.1 生产者

_ = ch.ExchangeDeclare("logs_direct", "direct", true, false, false, false, nil, )
body := "Hello World "
_ = ch.Publish("logs_direct", "", false, false,
	amqp.Publishing{
		ContentType: "text/plain",
		Body:        []byte(body),
	})

2.6.2 消费者

  • 只接受warn
_ = ch.ExchangeDeclare("logs_direct", "direct", true, false, false, false, nil, )
q, _ := ch.QueueDeclare("hello", false, false, true, false, nil, )
_ = ch.QueueBind(q.Name, "warn", "logs_direct", false, nil, )
msgs, _ := ch.Consume(q.Name, "", true, false, false, false, nil, )
  • 只接受info
_ = ch.ExchangeDeclare("logs_direct", "direct", true, false, false, false, nil, )
q, _ := ch.QueueDeclare("hello", false, false, true, false, nil, )
_ = ch.QueueBind(q.Name, "info", "logs_direct", false, nil, )
msgs, _ := ch.Consume(q.Name, "", true, false, false, false, nil, )

2.7 Topics 模型

在这里插入图片描述

TopicExchangeDirectRoutingKeyTopicExchangeRouting key
Routingkeyitem.insert
  • 统配符
    * 匹配不多不少恰好1个词
    # 匹配一个或多个词
  • 如:
    fan.# 匹配 fan.one.two 或者 fan.one 等
    fan.* 只能匹配 fan.one

2.7.1 生产者

_ = ch.ExchangeDeclare("logs_topic", "topic", true, false, false, false, nil, )
body := "Hello World "
_ = ch.Publish("logs_topic", "", false, false,
	amqp.Publishing{
		ContentType: "text/plain",
		Body:        []byte(body),
	})

2.7.2 消费者

  • 只接受*.one
_ = ch.ExchangeDeclare("logs_topic", "topic", true, false, false, false, nil, )
q, _ := ch.QueueDeclare("hello", false, false, true, false, nil, )
_ = ch.QueueBind(q.Name, "*.one", "logs_topic", false, nil, )
msgs, _ := ch.Consume(q.Name, "", true, false, false, false, nil, )
  • 只接受*.fan
_ = ch.ExchangeDeclare("logs_topic", "topic", true, false, false, false, nil, )
q, _ := ch.QueueDeclare("hello", false, false, true, false, nil, )
_ = ch.QueueBind(q.Name, "*.fan", "logs_topic", false, nil, )
msgs, _ := ch.Consume(q.Name, "", true, false, false, false, nil, )

2.8 RPC 模型

在这里插入图片描述

日后补充