消息队列的使用场景
异步处理
异步是指不用一次性执行完,可以先去执行别的,等这边回应了再做处理。这里我就拿一个网上用烂的例子:用户注册。用户在正确填写信息后点击注册,这时会发起网络请求到后端去做数据合法性等的验证,一旦验证通过则会将数据写入数据库并返回注册成功类的信息。但现在很多的应用都会有短信、邮箱等其他媒介的额外通知,这些又是在什么时候实现的呢?我们先说说同步的处理方式,数据写入数据库之后随即调用发短信、邮箱的接口,发完这些信息之后再给前端返回注册成功,这个就是同步的方式。但这里面有一个问题,那就是效率的问题,可能当前计算机暂时腾不出资源发短信和邮件,即暂时阻塞了,那么前端就必须等到后端成功发送短信邮件了才能收到注册成功的信息,从用户体验来说是非常差的。如果采用异步的处理方式就不一样了,在数据写入数据库之后我们可以将这个请求或者相关标志信息写入队列,然后返回注册成功的字样。这样一来用户很就能得知自己注册成功了,至于短信和邮件其实是次要的,晚一点收到也无所谓,所以在后端程序有空闲的时候可以从消息队列中取出这些信息并发送短信或邮件。这个就是异步,即不必把响应信息和发送短信邮件一次性做完,可以先在消息队列中做记录,然后先返回响应信息,发短信邮件的后续再做。
流量削峰
这个流量削峰说得通俗一点就是并发请求的控制,像大型电商网站搞什么限时秒杀、大型团购等活动的时候,在那段时间并发请求量是相当多的,如果后端没有做好优化网站很容易就崩了。这个时候为了控制这种情况,可以考虑使用消息队列来控制并发请求数量。在消息队列未满之前将请求写入消息队列,当消息队列满了之后不再往里面写,而是将后续的请求直接转到其他页面(如错误页面等),即而控制了这些并发请求,至于后续的操作就从队列中出队后再做处理。
应用解耦
耦合是指两个模块之间的关联程度,解耦就是尽可能地降低模块之间的关联性。还是拿用烂了的例子:订单模块和仓库模块。通常来说,在用户下订单时操作的是订单模块,下了订单之后就需要减少商品库存,这时候需要调仓库模块,从这种关系上看就是订单模块中调用了仓库模块里的接口,从而这两个模块就产生耦合了。现在借助消息队列就可以使得这两个模块解耦,首先让仓库模块订阅消息队列,当订单模块中下了订单并需要修改库存的时候发消息写入消息队列,这时订阅了该消息队列的仓库模块会接收到这条消息,进而解析消息中的内容并去修改对应商品的库存。
RabbitMQ简介
消息队列(Message Queue)是一种中间件,位于底层复杂的操作系统和应用软件之间。消息队列从名字就可以看出存入其中的消息是先进先出的(FIFO),我们把存入消息的对象称为生产者(producer),把取出消息的对象称为消费者(consumer),而消息队列只是中间的用来暂存消息的中介罢了。消息队列的实现有很多种,如:RabbitMQ、RocketMQ、ActiveMQ、Kafka等,本文讲的是RabbitMQ的使用。
RabbitMQ官方使用教程:https://www.rabbitmq.com/getstarted.html
RabbitMQ下载安装
erlang下载地址:https://www.erlang.org/downloads
RabbitMQ下载地址:https://www.rabbitmq.com/install-windows.html#installer
说明:RabbitMQ底层是基于erlang这门编程语言写的,因此在安装运行RabbitMQ之前必须先安装erlang,否则RabbitMQ服务将无法正常使用。
RabbitMQ启动
windows下配置(Linux版本的后面有需要再进行补充)
在安装完erlang后系统会自动在环境变量里配置ERLANG_HOME,如果没有则需要手动配置,ERLANG_HOME变量对应的值为erlang的安装路径。
在安装完RabbitMQ之后系统会自动注册服务并自动启动服务,可以通过命令行输入services.msc打开服务列表,然后找RabbitMQ服务看是否为正在运行,如果不是则右键启动服务。
还有方便的办法,打开开始菜单从最近添加或者找R开头的软件列表,从里面可以看到RabbitMQ Service -start和RabbitMQ Service -stop这两个命令行快捷方式,只需要双击打开即可开启服务和停止服务。
接下来看RabbitMQ服务是否启动成功,直接进入RabbitMQ目录下找sbin目录,路径类似于:E:\RabbitMQ\rabbitmq_server-3.8.2\sbin里面有一个名为rabbitmqctl.bat的脚本文件,在sbin目录中按住shift + 鼠标右键在当前目录下打开命令行窗口,并输入rabbitmqctl status查看RabbitMQ服务的启动状态。如有出现类似如下的详细信息则表示服务已成功启动,否则就需要去看看服务的运行状态了。如果服务启动但是这里却显示启动失败可以尝试下停止服务后再启动服务。
RabbitMQ是有一个可视化界面可以进行数据的统计和管理的,这个可视化界面是一个网页的形式,其对应的地址默认是127.0.0.1:15672。但直接在浏览器地址栏输入该地址肯定是404的,因为虽然你启动了RabbitMQ的服务,现在RabbitMQ是可以正常使用了,但是这个可视化界面和RabbitMQ服务本身是两个不同的东西,你需要把可视化界面挂到15672端口(作为进程),因此你还需要去运行命令来启动可视化界面(这个可视化界面对RabbitMQ来说是一个插件,从命令构成上可以看出)。首先还是进入到RabbitMQ下的sbin目录下,在命令行输入:rabbitmq-plugins enable rabbitmq_management,然后浏览器输入:127.0.0.1:15672,如果打开登陆界面则表示可视化界面插件启动成功。
命令行启动可视化界面插件(成功图)
浏览器打开127.0.0.1:15672页面(成功图)
默认账号的账号和密码都是guest,初次登陆可以用guest账号,进入后可以手动创建其他的账号。
扩展包
下载安装:go get github.com/streadway/amqp
说明:前面的RabbitMQ安装只不过是这个软件的安装而已,现在我们需要在go程序中操作这个中间件,自然就需要调用RabbitMQ官方提供的api来进行一系列的访问操作。
RabbitMQ使用
其中P为Producer(生产者),即消息的发送方;Cx为Consumer(消费者),即消息的接收方。红色区域为消息队列。
工作队列
生产者
package main
import (
"github.com/streadway/amqp"
"log"
)
func main(){
//客户端连接消息队列
conn,err := amqp.Dial("amqp://admin:admin@127.0.0.1:5672")
if err != nil {
log.Fatal(err)
}
defer conn.Close()
//获取通道,所有操作基本都是通道控制的
ch,err := conn.Channel()
if err != nil {
log.Fatal(err)
}
//队列声明
q,err := ch.QueueDeclare(
"name",
false,
false,
false,
false,
nil,
)
if err != nil {
log.Fatal(err)
}
//发送消息
err = ch.Publish(
"",
q.Name,
false,
false,
amqp.Publishing{
ContentType:"text/plain",
Body:[]byte("Jung Ken"), //消息的内容
},
)
if err != nil {
log.Fatal(err)
}
}
消费者
package main
import (
"fmt"
"github.com/streadway/amqp"
"log"
)
func main(){
conn,err := amqp.Dial("amqp://admin:admin@127.0.0.1:5672")
if err != nil {
log.Fatal(err)
}
defer conn.Close()
ch,err := conn.Channel()
if err != nil {
log.Fatal(err)
}
//此处队列声明中的参数必须和同名队列参数一致,否则将出错
q,err := ch.QueueDeclare(
"name",
false,
false,
false,
false,
nil,
)
if err != nil {
log.Fatal(err)
}
//消费者接收消息,msgs为只读通道
msgs,err := ch.Consume(q.Name,"",true,false,false,false,nil)
if err != nil {
log.Fatal(err)
}
for v := range msgs {
fmt.Printf("body = %s\n",v.Body)
}
}
上面是最简单的RabbitMQ使用案例,如果消费者窗口多开几个即可模仿同时有多个消费者要消费队列中的消息。
Tip:有一点要特别注意,RabbitMQ生产者在发送消息之前必须确保有消费者在等待读取消息,否则非持久性消息将被丢失,持久性的消息另当别论。
针对当前的消费者、生产者模式可能出现的问题
问: 队列在派遣任务的时候是不知道消费者当前是否有还没处理完的消息的,倘若队列一致给消费者C1分配任务而没有给C2分配任务,会造成C1的任务积压太多而C2一直闲着,这时候应该怎么解决这个问题使得队列的任务可以公平派遣(即所有的消费者都能同等地分配到任务)?
答:需要在消费者代码中添加一段代码,使队列每次给消费者分配任务的时候不要分配1个以上的任务,在消费者处理完任务之前不会再接收其他任务,这时候RabbitMQ就会找一个空闲的消费者进行任务分配。添加的代码如下:
err = ch.Qos(
1,
0,
false,
)
if err != nil {
//无法设置Qos
log.Fatal(err)
}
完整的消费者端代码
package main
import (
"fmt"
"github.com/streadway/amqp"
"log"
)
func main(){
conn,err := amqp.Dial("amqp://admin:admin@127.0.0.1:5672")
if err != nil {
log.Fatal(err)
}
defer conn.Close()
ch,err := conn.Channel()
if err != nil {
log.Fatal(err)
}
q,err := ch.QueueDeclare(
"name",
false,
false,
false,
false,
nil,
)
if err != nil {
log.Fatal(err)
}
//设置每次从消息队列获取任务的数量
err = ch.Qos(
1, //预取任务数量
0, //预取大小
false, //全局设置
)
if err != nil {
//无法设置Qos
log.Fatal(err)
}
msgs,err := ch.Consume(q.Name,"",true,false,false,false,nil)
if err != nil {
log.Fatal(err)
}
for v := range msgs {
fmt.Printf("body = %s\n",v.Body)
}
}
问:RabbitMQ服务器关闭后消息队列和里面存的消息都没了该怎么办?(可能并非人为的关闭服务器,可能是宕机导致的RabbitMQ服务器关闭)
答:在代码声明消息队列的时候里面的durable参数给配置成true(该参数表示持久性,设置为true则表示支持持久化)。在发布消息的时候在amqp.Publishing里面多配置一个DeliveryMode:amqp.Persistent
Tip:生产者和消费者代码中都需要指定消息队列,因此他们在参数配置上是一致的,才能表示操作的是同一个消息队列,因此如果生产者这边设置了持久化,那么消费者同样也需要设置成持久化。
问:消息确认问题,假设有一个消费者正在处理消息,消息还没处理完毕突然就中断了(可能该消费者进程被杀死了),这时候该消费者正在处理的消息就丢失了,应该怎么解决这种消息丢失的问题?
答:这时候不关生产者的事了,主要是消费者这边的设置。在消费者端的代码中的Consume方法下的autoAck自动确认字段设置为false,表示我们不做自动确认。而是在消息处理完毕之后手动确认,这样如果消息没处理完就中断了,RabbitMQ服务器没有收到消息的确认,那么队列中的该消息将会再次被重新分配。消费者端修改如下:
发布/订阅
从RabbitMQ对消息队列的设计以及生产者、消费者、队列的结构上看,生产者通常不会将消息直接发送给队列存储,而是会通过交换机接收消息,再将消息推入队列中。下图中的x表示Exchange(交换机),交换机有4种类型:direct、topic、headers、fanout。
在这种模型中,有时候会需要用到临时队列,即需要用到就当即创建,不用到的时候就自动删除。临时队列的声明很简单,只需要在队列声明的时候队列名字写成空串即可,这样该临时队列就会被分配一个随机的名字,如:amq.gen-JzTY20BRgKO-HjmUJj0wLg。在连接关闭的时候该临时队列会被删除。
生产者
package main
import (
"github.com/streadway/amqp"
"log"
)
func main(){
conn,err := amqp.Dial("amqp://admin:admin@127.0.0.1:5672")
if err != nil {
log.Fatal(err)
}
defer conn.Close()
ch,err := conn.Channel()
if err != nil {
log.Fatal(err)
}
err = ch.ExchangeDeclare(
"ex-change",
"fanout",
true,
false,
false,
false,
nil,
)
if err != nil {
log.Fatal(err)
}
err = ch.Publish(
"ex-change",
"", //路由键
false,
false,
amqp.Publishing{
//DeliveryMode:amqp.Persistent,
ContentType:"text/plain",
Body:[]byte("Jung Ken"),
},
)
if err != nil {
log.Fatal(err)
}
}
消费者
package main
import (
"fmt"
"github.com/streadway/amqp"
"log"
)
func main(){
conn,err := amqp.Dial("amqp://admin:admin@127.0.0.1:5672")
if err != nil {
log.Fatal(err)
}
defer conn.Close()
ch,err := conn.Channel()
if err != nil {
log.Fatal(err)
}
err = ch.ExchangeDeclare(
"ex-change",
"fanout",
true,
false,
false,
false,
nil,
)
if err != nil {
log.Fatal(err)
}
q,err := ch.QueueDeclare(
"",
false,
false,
false,
false,
nil,
)
if err != nil {
log.Fatal(err)
}
err = ch.QueueBind(
q.Name,
"",
"ex-change",
false,
nil,
)
if err != nil {
log.Fatal(err)
}
msgs,err := ch.Consume(q.Name,"",false,false,false,false,nil)
if err != nil {
log.Fatal(err)
}
for v := range msgs {
fmt.Printf("body = %s\n",v.Body)
}
}
完整的模型图
结合模型图片和代码组成可以看出这里面是层层相扣的。首先在生产者这边,由于生产者不直接将消息发送到队列而是发送到交换机,所以在生产者代码中需要指定一个交换机,并且发送消息的时候也需要指定发送到该交换机中。其次在消费者这边同样需要指明要使用哪个交换机(消息贮存区,内部存了某些消息),并声明好一个临时队列(不指定队列名字而是由系统随机分配)用来接收消息,然后将交换机和临时队列进行绑定,最后由消费者从临时队列中读取消息进行消费。
交换机的4种类型
fanout(扇出):广播发送消息到所有绑定的队列,不处理路由键。
说明:fanout类型的交换机不处理路由键,并且只要队列绑定到交换机上,就能接收到消息。
使用场景:假设用户在应用中订阅了新闻,那么当新闻发生变更(修改、删除、发布、更新)的时候所有的这些用户将统一收到新闻变更的信息。
代码示例:fanout代码同上发布/订阅,此处不做重复示例。
direct(直接):根据发送的消息的路由键和交换机与临时队列绑定的路由键做完整匹配,若能匹配上则将消息推送到该队列中,否则不会将消息推送给该队列。
说明:direct类型交换机处理路由键,并且要求路由键完整匹配,如图所示只要路由键是error、warning、info的均能被转发到队列,除此以外的路由键将不会被匹配,消息也会被丢弃。其中error被绑定在2个队列中,即相当于fanout,路由键为error将被广播发送到这两个队列中。
使用场景:日志收集。系统规定普通日志info、警告日志warning只是打印,而错误日志error除了需要打印之外还需要写入到磁盘以便后期的错误追踪。对于其他类型的信息可能由其他交换机做存储转发,此处的交换机仅仅是处理日志信息。
代码示例:
生产者
package main
import (
"fmt"
"github.com/streadway/amqp"
"log"
"math/rand"
"time"
)
func main(){
conn,err := amqp.Dial("amqp://admin:admin@127.0.0.1:5672")
if err != nil {
log.Fatal(err)
}
defer conn.Close()
ch,err := conn.Channel()
if err != nil {
log.Fatal(err)
}
err = ch.ExchangeDeclare(
"ex-change",
"direct",
false,
false,
false,
false,
nil,
)
if err != nil {
log.Fatal(err)
}
rkeys := []string{"error","warning","info"}
rand.Seed(time.Now().UnixNano())
msg := rkeys[rand.Intn(len(rkeys))]
fmt.Println("producer:msg - " + msg)
err = ch.Publish(
"ex-change",
msg, //路由键
false,
false,
amqp.Publishing{
//DeliveryMode:amqp.Persistent,
ContentType:"text/plain",
//Body:[]byte(msg),
Body:[]byte("error"),
},
)
if err != nil {
log.Fatal(err)
}
}
消费者
package main
import (
"fmt"
"github.com/streadway/amqp"
"log"
"math/rand"
"time"
)
func main(){
conn,err := amqp.Dial("amqp://admin:admin@127.0.0.1:5672")
if err != nil {
log.Fatal(err)
}
defer conn.Close()
ch,err := conn.Channel()
if err != nil {
log.Fatal(err)
}
err = ch.ExchangeDeclare(
"ex-change",
"direct",
false,
false,
false,
false,
nil,
)
if err != nil {
log.Fatal(err)
}
q,err := ch.QueueDeclare(
"",
false,
false,
false,
false,
nil,
)
if err != nil {
log.Fatal(err)
}
rkeys := []string{"error","warning","info"}
rand.Seed(time.Now().UnixNano())
msg := rkeys[rand.Intn(len(rkeys))]
fmt.Println("consumer:msg - " + msg)
err = ch.QueueBind(
q.Name, //队列名称
msg, //路由键
"ex-change", //交换机名称
false,
nil,
)
if err != nil {
log.Fatal(err)
}
msgs,err := ch.Consume(q.Name,"",false,false,false,false,nil)
if err != nil {
log.Fatal(err)
}
for v := range msgs {
fmt.Printf("body = %s\n",v.Body)
}
}
topic(话题):在direct的基础上做模糊匹配,即direct需要完整的一个一个字符匹配,而topic只需要部分匹配其他的用通配符进行匹配。若能匹配上则将消息推送到该队列中,否则不会将消息推送给该队列。#可以匹配零个或若干个词,而*只能匹配一个词。topic类型对路由键是有格式限制的,即路由键只能由单词组成,单词之间用电分隔,如:aaa.bbb、aaa.bbb.ccc,像aaa_bbb,aaa,bbb都是错误的。
说明:topic类型的交换机只要求做部分匹配即可,使用#(匹配0或若干个单词)和*(匹配一个单词)来实现部分匹配。此处规定路由键必须是单词或单词组,单词之间使用点(.)作为分隔符。如果路由键写完整则相当于完整匹配,同direct类型的交换机。如果路由键为“#”(匹配0个或多个单词)则可以和空串匹配,相当于fanout类型的交换机。
使用场景:只需要满足路由键的部分匹配即可。假设规定只要是aaa.开头的就能成功匹配,那么只需要定义交换机与队列绑定时的路由键为aaa.#,点(.)只是作为分隔符,不作为匹配内容,topic中固定的路由键内容只有单词。
代码示例:
生产者
package main
import (
"github.com/streadway/amqp"
"log"
)
func main(){
conn,err := amqp.Dial("amqp://admin:admin@127.0.0.1:5672")
if err != nil {
log.Fatal(err)
}
defer conn.Close()
ch,err := conn.Channel()
if err != nil {
log.Fatal(err)
}
err = ch.ExchangeDeclare(
"ex-change2",
"topic",
false,
false,
false,
false,
nil,
)
if err != nil {
log.Fatal(err)
}
err = ch.Publish(
"ex-change2",
"lazy.white.rabbit", //路由键
false,
false,
amqp.Publishing{
//DeliveryMode:amqp.Persistent,
ContentType:"text/plain",
//Body:[]byte(msg),
Body:[]byte("内容成功发送"),
},
)
if err != nil {
log.Fatal(err)
}
}
消费者
package main
import (
"fmt"
"github.com/streadway/amqp"
"log"
)
func main(){
conn,err := amqp.Dial("amqp://admin:admin@127.0.0.1:5672")
if err != nil {
log.Fatal(err)
}
defer conn.Close()
ch,err := conn.Channel()
if err != nil {
log.Fatal(err)
}
err = ch.ExchangeDeclare(
"ex-change2",
"topic",
false,
false,
false,
false,
nil,
)
if err != nil {
log.Fatal(err)
}
q,err := ch.QueueDeclare(
"",
false,
false,
false,
false,
nil,
)
if err != nil {
log.Fatal(err)
}
err = ch.QueueBind(
q.Name, //队列名称
//"lazy.#", //路由键
"*.white.*", //路由键
"ex-change2", //交换机名称
false,
nil,
)
if err != nil {
log.Fatal(err)
}
msgs,err := ch.Consume(q.Name,"",false,false,false,false,nil)
if err != nil {
log.Fatal(err)
}
for v := range msgs {
fmt.Printf("body = %s\n",v.Body)
}
}
headers(键值对):headers不处理路由键,它是一组键值对。在交换机和临时队列进行绑定的时候需要设置用于匹配的headers键值对,若消息的键值对和交换机、临时队列绑定的键值对匹配得上,则将消息转发到该队列,否则不转发到该队列。
说明:headers类型交换机不处理路由键,而是通过键值对进行匹配。只有键和值同时匹配上才能匹配成功,是匹配精度最高的交换机类型。
使用场景:对于不同的用户进行消息的分开处理。有时候我们希望每个用户都有其独立的消息队列用来接收与其相关的消息并单独做处理。
代码示例:
生产者
package main
import (
"github.com/streadway/amqp"
"log"
)
func main(){
conn,err := amqp.Dial("amqp://admin:admin@127.0.0.1:5672")
if err != nil {
log.Fatal(err)
}
defer conn.Close()
ch,err := conn.Channel()
if err != nil {
log.Fatal(err)
}
err = ch.ExchangeDeclare(
"ex-change3",
"headers",
false,
false,
false,
false,
nil,
)
if err != nil {
log.Fatal(err)
}
err = ch.Publish(
"ex-change3",
"", //路由键
false,
false,
amqp.Publishing{
//设置Headers键值对
Headers:amqp.Table{"name":"Jung Ken"},
//DeliveryMode:amqp.Persistent,
ContentType:"text/plain",
//Body:[]byte(msg),
Body:[]byte("内容成功发送Headers"),
},
)
if err != nil {
log.Fatal(err)
}
}
消费者
package main
import (
"fmt"
"github.com/streadway/amqp"
"log"
)
func main(){
conn,err := amqp.Dial("amqp://admin:admin@127.0.0.1:5672")
if err != nil {
log.Fatal(err)
}
defer conn.Close()
ch,err := conn.Channel()
if err != nil {
log.Fatal(err)
}
err = ch.ExchangeDeclare(
"ex-change3",
"headers",
false,
false,
false,
false,
nil,
)
if err != nil {
log.Fatal(err)
}
q,err := ch.QueueDeclare(
"",
false,
false,
false,
false,
nil,
)
if err != nil {
log.Fatal(err)
}
err = ch.QueueBind(
q.Name, //队列名称
"", //路由键
"ex-change3", //交换机名称
false,
amqp.Table{"name":"Jung Ken"}, //设置Headers键值对
)
if err != nil {
log.Fatal(err)
}
msgs,err := ch.Consume(q.Name,"",false,false,false,false,nil)
if err != nil {
log.Fatal(err)
}
for v := range msgs {
fmt.Printf("body = %s\n",v.Body)
}
}
Tip:上面这4种交换机类型中的使用场景均是举例,只需通过所举的例子了解大概用在什么方向,具体什么时候用,在哪用,怎么用就要看自己的业务啦。
RabbitMQ的RPC调用
前面我们举得案例都是基于一台机子做测试的,从连接地址上就可以看出,最终运行起来生产者和消费者都是在一台机子上做操作。如果真正投入生产环境,客户端就是分布在不同的区域上,肯定与生产者不在一台机子上的。这时候如果想要生产者想要发送消息给消费者处理或者消费者发送消息给生产者处理(这里的生产者和消费者是相对的,谁发消息谁就是生产者,谁处理消息谁就是消费者),就要涉及到一个RPC(Remote Procedure Call 远程过程调用)的问题。
说明:看到客户端和服务端就可以想到这个过程就是一个客户端发送请求,服务端处理请求并作出响应的过程。首先是客户端发送请求消息,请求消息中标有唯一id,用来和响应信息对应(响应消息中也标有相同的id,才能与请求消息进行对应,不然都不知道这个响应对应的是哪个客户端哪个请求)。其次请求消息中指定了回调队列,因为我们的客户端只能指定对某个队列进行消息的监听读取,如果消息不是发送到该队列,客户端是肯定读取不到的。即我们指定好了读取的位置,只要你将消息放到我指定的位置我就能读取到,否则我是读取不到消息的。另外,如果消息放到回调队列但是没办法和请求消息的唯一id匹配,那么该消息将被丢弃。
使用场景:这个过程符合网络应用的设计,不过是多了队列这一层的消息传递。因此如果是网络应用使用消息队列的话都可以参考该模型去处理。
代码示例
客户端
package main
import (
"fmt"
"github.com/streadway/amqp"
"log"
)
func main(){
conn,err := amqp.Dial("amqp://admin:admin@127.0.0.1:5672")
if err != nil {
log.Fatal(err)
}
defer conn.Close()
ch,err := conn.Channel()
if err != nil {
log.Fatal(err)
}
//回调队列
callbackQueue,err := ch.QueueDeclare(
"callback_queue",
false,
false,
false,
false,
nil,
)
if err != nil {
log.Fatal(err)
}
msgs,err := ch.Consume(
callbackQueue.Name,
"",
true,
false,
false,
false,
nil,
)
if err != nil {
log.Fatal(err)
}
//请求队列
requestQueue,err := ch.QueueDeclare(
"rpc_queue",
false,
false,
false,
false,
nil,
)
if err != nil {
log.Fatal(err)
}
correlationId := "abc"
err = ch.Publish(
"",
requestQueue.Name, //请求队列
false,
false,
amqp.Publishing{
CorrelationId:correlationId,
ReplyTo:callbackQueue.Name,
ContentType:"text/plain",
Body:[]byte("来自客户端的消息"),
},
)
if err != nil {
log.Fatal(err)
}
for msg := range msgs {
if correlationId == msg.CorrelationId {
fmt.Printf("客户端:body = %s\n",msg.Body)
}else {
fmt.Println("correlationId不匹配,消息丢弃")
}
}
}
服务端
package main
import (
"fmt"
"github.com/streadway/amqp"
"log"
)
func main(){
conn,err := amqp.Dial("amqp://admin:admin@127.0.0.1:5672")
if err != nil {
log.Fatal(err)
}
defer conn.Close()
ch,err := conn.Channel()
if err != nil {
log.Fatal(err)
}
forever := make(chan bool)
//请求队列
requestQueue,err := ch.QueueDeclare(
"rpc_queue",
false,
false,
false,
false,
nil,
)
if err != nil {
log.Fatal(err)
}
err = ch.Qos(
1, // prefetch count
0, // prefetch size
false, // global
)
if err != nil {
log.Fatal(err)
}
msgs,err := ch.Consume(
requestQueue.Name,
"",
false,
false,
false,
false,
nil,
)
go func(){
for msg := range msgs {
fmt.Printf("服务端:body = %s\n",msg.Body)
callbackQueue,err := ch.QueueDeclare(
msg.ReplyTo,
false,
false,
false,
false,
nil,
)
if err != nil {
log.Fatal(err)
}
err = ch.Publish(
"",
callbackQueue.Name,
false,
false,
amqp.Publishing{
CorrelationId:msg.CorrelationId,
ContentType:"text/plain",
Body:[]byte("来自服务器的消息"),
},
)
if err != nil {
log.Fatal(err)
}
}
}()
<- forever
}
代码说明:客户端发送的请求消息和接收到来自服务端的响应消息中都设置了correlationId这个作为唯一标识的字段,对于请求消息对应的响应消息而言,这两个消息的correlationId是一样的。客户端接收到响应消息之后通过该字段进行比较判断当前的响应消息是否对应我的请求消息,如果相等则表示请求消息和响应消息是对应的,反之则表示不对应,该响应消息会被丢弃。虽然说RabbitMQ能通过该字段进行请求消息和响应消息的匹配,但实际上还是得我们手动地判断这两个消息的correlationId是否相等。
Tip:correlationId是一个唯一标识,因此在设计的时候需要确保其唯一性,否则可能出现一个请求对应多个响应的情况,这是不合理的。上面示例代码只是做演示,所以简单用一个字符串做验证罢了。
以上是RabbitMQ消息队列的介绍及简单使用。