这篇文章主要讲解了“消息队列原理之如何掌握rabbitmq”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“消息队列原理之如何掌握rabbitmq”吧!
介绍
RabbitMQ 是一个由 Erlang 开发的 AMQP(Advanced Message Queuing Protocol,高级消息队列协议)的开源实现,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。支持多种客户端语言。
架构
整体架构对照下面的图说明
先看看图片上各个名次的解释:
Rabbitmq BrokerTCPQueueExchangeBrokerBroker
RoutingKeyExchangeBindingQueueExchangeExchangeQueueExchangeBindingRoutingKeyExchangedirect,fanout,topic,headersQueueExchangekeyBindkeyProducerExchangekeykeyRoutekeyRoutekeyBindkeyRoutekeyBindkeyBingkey*#benz.carcar*.carbenz.car*.carbenz.carQueueRoutekeyBindkeyheaders
对照上面图和名次解释应该比较清晰明了了,下面我们通过几个例子说明如何使用。
用法(golang)
direct
QueueRoutekey==QueueName
package main import ( "fmt" "github.com/streadway/amqp" "log" ) func handlerError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) } } var url = "amqp://username:password@ip:port" func main() { conn, err := amqp.Dial(url) handlerError(err, "Failed to connect to RabbitMQ") defer conn.Close() channel, err := conn.Channel() handlerError(err, "Failed to open a Channel") defer channel.Close() queueNameCar := "car" if _, err := channel.QueueDeclare(queueNameCar, false, false, false, false, nil); err != nil { handlerError(err, "Failed to decare Queue") } if err := channel.Publish("", queueNameCar, false, false, amqp.Publishing{ContentType: "text/plain", Body: []byte("test car")}); err != nil { handlerError(err, "Failed to publish message") } }
main() 函数cardefalut exchangecarcar
rejectexchangeproducerexchangerejectRoutekeyproducerroutekeyroutekey10
我们自己创建一个 direct 类型的 exchange 并绑定一些队列看看是什么效果。
func main() { conn, err := amqp.Dial(url) handlerError(err, "Failed to connect to RabbitMQ") defer conn.Close() channel, err := conn.Channel() handlerError(err, "Failed to open a Channel") defer channel.Close() directExchangeNameCar := "direct.car" if err := channel.ExchangeDeclare(directExchangeNameCar, "direct", true, false, false, false, nil); err != nil { handlerError(err, "Failed to decalare exchange") } queueNameCar := "car" queueNameBigCar := "big-car" queueNameMiddleCar := "middle-car" queueNameSmallCar := "small-car" channel.QueueDeclare(queueNameCar, false, false, false, false, nil) channel.QueueDeclare(queueNameBigCar, false, false, false, false, nil) channel.QueueDeclare(queueNameMiddleCar, false, false, false, false, nil) channel.QueueDeclare(queueNameSmallCar, false, false, false, false, nil) if err := channel.QueueBind(queueNameCar, "car", directExchangeNameCar, false, nil); err != nil { handlerError(err, "Failed to bind queue to exchange") } if err := channel.QueueBind(queueNameBigCar, "car", directExchangeNameCar, false, nil); err != nil { handlerError(err, "Failed to bind queue to exchange") } if err := channel.QueueBind(queueNameBigCar, "big.car", directExchangeNameCar, false, nil); err != nil { handlerError(err, "Failed to bind queue to exchange") } if err := channel.QueueBind(queueNameMiddleCar, "car", directExchangeNameCar, false, nil); err != nil { handlerError(err, "Failed to bind queue to exchange") } if err := channel.QueueBind(queueNameMiddleCar, "middler.car", directExchangeNameCar, false, nil); err != nil { handlerError(err, "Failed to bind queue to exchange") } if err := channel.QueueBind(queueNameSmallCar, "car", directExchangeNameCar, false, nil); err != nil { handlerError(err, "Failed to bind queue to exchange") } if err := channel.QueueBind(queueNameSmallCar, "small.car", directExchangeNameCar, false, nil); err != nil { handlerError(err, "Failed to bind queue to exchange") } if err := channel.Publish(directExchangeNameCar, "car", false, false, amqp.Publishing{ContentType: "text/plain", Body: []byte("test car")}); err != nil { handlerError(err, "Failed to publish message") } }
ExchangeQueueBindingBindingcar
QueueExchangeExchangeunbindqueueexchangeexchangeexchange
fanout
fanout 工作方式类似于广播,看看下面的代码
func main() { conn, err := amqp.Dial(url) handlerError(err, "Failed to connect to RabbitMQ") defer conn.Close() channel, err := conn.Channel() handlerError(err, "Failed to open a Channel") defer channel.Close() fanoutExchangeNameCar := "fanout.car" if err := channel.ExchangeDeclare(fanoutExchangeNameCar, "fanout", true, false, false, false, nil); err != nil { handlerError(err, "Failed to decalare exchange") } queueNameCar := "car" queueNameBigCar := "big-car" queueNameMiddleCar := "middle-car" queueNameSmallCar := "small-car" channel.QueueDeclare(queueNameCar, false, false, false, false, nil) channel.QueueDeclare(queueNameBigCar, false, false, false, false, nil) channel.QueueDeclare(queueNameMiddleCar, false, false, false, false, nil) channel.QueueDeclare(queueNameSmallCar, false, false, false, false, nil) if err := channel.QueueBind(queueNameCar, "car", fanoutExchangeNameCar, false, nil); err != nil { handlerError(err, "Failed to bind queue to exchange") } if err := channel.QueueBind(queueNameBigCar, "car", fanoutExchangeNameCar, false, nil); err != nil { handlerError(err, "Failed to bind queue to exchange") } if err := channel.QueueBind(queueNameBigCar, "big.car", fanoutExchangeNameCar, false, nil); err != nil { handlerError(err, "Failed to bind queue to exchange") } if err := channel.QueueBind(queueNameMiddleCar, "car", fanoutExchangeNameCar, false, nil); err != nil { handlerError(err, "Failed to bind queue to exchange") } if err := channel.QueueBind(queueNameMiddleCar, "middler.car", fanoutExchangeNameCar, false, nil); err != nil { handlerError(err, "Failed to bind queue to exchange") } if err := channel.QueueBind(queueNameSmallCar, "car", fanoutExchangeNameCar, false, nil); err != nil { handlerError(err, "Failed to bind queue to exchange") } if err := channel.QueueBind(queueNameSmallCar, "small.car", fanoutExchangeNameCar, false, nil); err != nil { handlerError(err, "Failed to bind queue to exchange") } if err := channel.Publish(fanoutExchangeNameCar, "middle.car", false, false, amqp.Publishing{ContentType: "text/plain", Body: []byte("test car")}); err != nil { handlerError(err, "Failed to publish message") } }
fanoutfanout.carmiddle.car
topic
topicexchangequeue
func main() { conn, err := amqp.Dial(url) handlerError(err, "Failed to connect to RabbitMQ") defer conn.Close() channel, err := conn.Channel() handlerError(err, "Failed to open a Channel") defer channel.Close() topicExchangeNameCar := "topic.car" if err := channel.ExchangeDeclare(topicExchangeNameCar, "topic", true, false, false, false, nil); err != nil { handlerError(err, "Failed to decalare exchange") } queueNameCar := "car" queueNameBigCar := "big-car" queueNameMiddleCar := "middle-car" queueNameSmallCar := "small-car" channel.QueueDeclare(queueNameCar, false, false, false, false, nil) channel.QueueDeclare(queueNameBigCar, false, false, false, false, nil) channel.QueueDeclare(queueNameMiddleCar, false, false, false, false, nil) channel.QueueDeclare(queueNameSmallCar, false, false, false, false, nil) if err := channel.QueueBind(queueNameCar, "car", topicExchangeNameCar, false, nil); err != nil { handlerError(err, "Failed to bind queue to exchange") } if err := channel.QueueBind(queueNameBigCar, "car", topicExchangeNameCar, false, nil); err != nil { handlerError(err, "Failed to bind queue to exchange") } if err := channel.QueueBind(queueNameBigCar, "big.car", topicExchangeNameCar, false, nil); err != nil { handlerError(err, "Failed to bind queue to exchange") } if err := channel.QueueBind(queueNameMiddleCar, "car", topicExchangeNameCar, false, nil); err != nil { handlerError(err, "Failed to bind queue to exchange") } if err := channel.QueueBind(queueNameMiddleCar, "middler.car", topicExchangeNameCar, false, nil); err != nil { handlerError(err, "Failed to bind queue to exchange") } if err := channel.QueueBind(queueNameSmallCar, "car", topicExchangeNameCar, false, nil); err != nil { handlerError(err, "Failed to bind queue to exchange") } if err := channel.QueueBind(queueNameSmallCar, "small.car", topicExchangeNameCar, false, nil); err != nil { handlerError(err, "Failed to bind queue to exchange") } if err := channel.QueueBind(queueNameSmallCar, "*.small.car", topicExchangeNameCar, false, nil); err != nil { handlerError(err, "Failed to bind queue to exchange") } if err := channel.QueueBind(queueNameSmallCar, "#.small.car", topicExchangeNameCar, false, nil); err != nil { handlerError(err, "Failed to bind queue to exchange") } }
producerqueue
if err := channel.Publish(topicExchangeNameCar, "car", false, false, amqp.Publishing{ContentType: "text/plain", Body: []byte("test car")}); err != nil { handlerError(err, "Failed to publish message") }
每个 queue 都会收到消息
if err := channel.Publish(topicExchangeNameCar, "small.car", false, false, amqp.Publishing{ContentType: "text/plain", Body: []byte("test car")}); err != nil { handlerError(err, "Failed to publish message") }
small-carsmall.car*.small.car#.small.car
if err := channel.Publish(topicExchangeNameCar, "benz.small.car", false, false, amqp.Publishing{ContentType: "text/plain", Body: []byte("test car")}); err != nil { handlerError(err, "Failed to publish message") }
small-car*.small.car#.small.car
if err := channel.Publish(topicExchangeNameCar, "auto.blue.benz.small.car", false, false, amqp.Publishing{ContentType: "text/plain", Body: []byte("test car")}); err != nil { handlerError(err, "Failed to publish message") }
small-car#.small.car
if err := channel.Publish(topicExchangeNameCar, "bike", false, false, amqp.Publishing{ContentType: "text/plain", Body: []byte("test car")}); err != nil { handlerError(err, "Failed to publish message") }
都不会收到消息,没有符合的 routekey 。
headers
这种类型很少有实际的应用场景。
感谢各位的阅读,以上就是“消息队列原理之如何掌握rabbitmq”的内容了,经过本文的学习后,相信大家对消息队列原理之如何掌握rabbitmq这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是亿速云,小编将为大家推送更多相关知识点的文章,欢迎关注!