消息队列原理之如何掌握rabbitmq

这篇文章主要讲解了“消息队列原理之如何掌握rabbitmq”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“消息队列原理之如何掌握rabbitmq”吧!

介绍

RabbitMQ 是一个由 Erlang 开发的 AMQP(Advanced Message Queuing Protocol,高级消息队列协议)的开源实现,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。支持多种客户端语言。

架构

整体架构对照下面的图说明

先看看图片上各个名次的解释:

Rabbitmq BrokerTCPQueueExchangeBrokerBroker
RoutingKeyExchangeBindingQueueExchangeExchangeQueueExchangeBindingRoutingKeyExchangedirect,fanout,topic,headersQueueExchangekeyBindkeyProducerExchangekeykeyRoutekeyRoutekeyBindkeyRoutekeyBindkeyBingkey*#benz.carcar*.carbenz.car*.carbenz.carQueueRoutekeyBindkeyheaders

对照上面图和名次解释应该比较清晰明了了,下面我们通过几个例子说明如何使用。

用法(golang)

direct

QueueRoutekey==QueueName
package main

import (
	"fmt"
	"github.com/streadway/amqp"
	"log"
)

func handlerError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}

var url = "amqp://username:password@ip:port"

func main() {
	conn, err := amqp.Dial(url)
	handlerError(err, "Failed to connect to RabbitMQ")
	defer conn.Close()

	channel, err := conn.Channel()
	handlerError(err, "Failed to open a Channel")
	defer channel.Close()

	queueNameCar := "car"
	if _, err := channel.QueueDeclare(queueNameCar, false, false, false, false, nil); err != nil {
		handlerError(err, "Failed to decare Queue")
	}

	if err := channel.Publish("", queueNameCar, false, false, amqp.Publishing{ContentType: "text/plain", Body: []byte("test car")}); err != nil {
		handlerError(err, "Failed to publish message")
	}
}
main() 函数cardefalut exchangecarcar
rejectexchangeproducerexchangerejectRoutekeyproducerroutekeyroutekey10

我们自己创建一个 direct 类型的 exchange 并绑定一些队列看看是什么效果。

func main() {
	conn, err := amqp.Dial(url)
	handlerError(err, "Failed to connect to RabbitMQ")
	defer conn.Close()

	channel, err := conn.Channel()
	handlerError(err, "Failed to open a Channel")
	defer channel.Close()

	directExchangeNameCar := "direct.car"
	if err := channel.ExchangeDeclare(directExchangeNameCar, "direct", true, false, false, false, nil); err != nil {
		handlerError(err, "Failed to decalare exchange")
	}

	queueNameCar := "car"
	queueNameBigCar := "big-car"
	queueNameMiddleCar := "middle-car"
	queueNameSmallCar := "small-car"
	channel.QueueDeclare(queueNameCar, false, false, false, false, nil)
	channel.QueueDeclare(queueNameBigCar, false, false, false, false, nil)
	channel.QueueDeclare(queueNameMiddleCar, false, false, false, false, nil)
	channel.QueueDeclare(queueNameSmallCar, false, false, false, false, nil)

	if err := channel.QueueBind(queueNameCar, "car", directExchangeNameCar, false, nil); err != nil {
		handlerError(err, "Failed to bind queue to exchange")
	}
	if err := channel.QueueBind(queueNameBigCar, "car", directExchangeNameCar, false, nil); err != nil {
		handlerError(err, "Failed to bind queue to exchange")
	}
	if err := channel.QueueBind(queueNameBigCar, "big.car", directExchangeNameCar, false, nil); err != nil {
		handlerError(err, "Failed to bind queue to exchange")
	}

	if err := channel.QueueBind(queueNameMiddleCar, "car", directExchangeNameCar, false, nil); err != nil {
		handlerError(err, "Failed to bind queue to exchange")
	}
	if err := channel.QueueBind(queueNameMiddleCar, "middler.car", directExchangeNameCar, false, nil); err != nil {
		handlerError(err, "Failed to bind queue to exchange")
	}

	if err := channel.QueueBind(queueNameSmallCar, "car", directExchangeNameCar, false, nil); err != nil {
		handlerError(err, "Failed to bind queue to exchange")
	}
	if err := channel.QueueBind(queueNameSmallCar, "small.car", directExchangeNameCar, false, nil); err != nil {
		handlerError(err, "Failed to bind queue to exchange")
	}

	if err := channel.Publish(directExchangeNameCar, "car", false, false, amqp.Publishing{ContentType: "text/plain", Body: []byte("test car")}); err != nil {
		handlerError(err, "Failed to publish message")
	}
}
ExchangeQueueBindingBindingcar
QueueExchangeExchangeunbindqueueexchangeexchangeexchange

fanout

fanout 工作方式类似于广播,看看下面的代码

func main() {
	conn, err := amqp.Dial(url)
	handlerError(err, "Failed to connect to RabbitMQ")
	defer conn.Close()

	channel, err := conn.Channel()
	handlerError(err, "Failed to open a Channel")
	defer channel.Close()

	fanoutExchangeNameCar := "fanout.car"
	if err := channel.ExchangeDeclare(fanoutExchangeNameCar, "fanout", true, false, false, false, nil); err != nil {
		handlerError(err, "Failed to decalare exchange")
	}

	queueNameCar := "car"
	queueNameBigCar := "big-car"
	queueNameMiddleCar := "middle-car"
	queueNameSmallCar := "small-car"
	channel.QueueDeclare(queueNameCar, false, false, false, false, nil)
	channel.QueueDeclare(queueNameBigCar, false, false, false, false, nil)
	channel.QueueDeclare(queueNameMiddleCar, false, false, false, false, nil)
	channel.QueueDeclare(queueNameSmallCar, false, false, false, false, nil)

	if err := channel.QueueBind(queueNameCar, "car", fanoutExchangeNameCar, false, nil); err != nil {
		handlerError(err, "Failed to bind queue to exchange")
	}
	if err := channel.QueueBind(queueNameBigCar, "car", fanoutExchangeNameCar, false, nil); err != nil {
		handlerError(err, "Failed to bind queue to exchange")
	}
	if err := channel.QueueBind(queueNameBigCar, "big.car", fanoutExchangeNameCar, false, nil); err != nil {
		handlerError(err, "Failed to bind queue to exchange")
	}

	if err := channel.QueueBind(queueNameMiddleCar, "car", fanoutExchangeNameCar, false, nil); err != nil {
		handlerError(err, "Failed to bind queue to exchange")
	}
	if err := channel.QueueBind(queueNameMiddleCar, "middler.car", fanoutExchangeNameCar, false, nil); err != nil {
		handlerError(err, "Failed to bind queue to exchange")
	}

	if err := channel.QueueBind(queueNameSmallCar, "car", fanoutExchangeNameCar, false, nil); err != nil {
		handlerError(err, "Failed to bind queue to exchange")
	}
	if err := channel.QueueBind(queueNameSmallCar, "small.car", fanoutExchangeNameCar, false, nil); err != nil {
		handlerError(err, "Failed to bind queue to exchange")
	}

	if err := channel.Publish(fanoutExchangeNameCar, "middle.car", false, false, amqp.Publishing{ContentType: "text/plain", Body: []byte("test car")}); err != nil {
		handlerError(err, "Failed to publish message")
	}
}
fanoutfanout.carmiddle.car

topic

topicexchangequeue
func main() {
	conn, err := amqp.Dial(url)
	handlerError(err, "Failed to connect to RabbitMQ")
	defer conn.Close()

	channel, err := conn.Channel()
	handlerError(err, "Failed to open a Channel")
	defer channel.Close()

	topicExchangeNameCar := "topic.car"
	if err := channel.ExchangeDeclare(topicExchangeNameCar, "topic", true, false, false, false, nil); err != nil {
		handlerError(err, "Failed to decalare exchange")
	}

	queueNameCar := "car"
	queueNameBigCar := "big-car"
	queueNameMiddleCar := "middle-car"
	queueNameSmallCar := "small-car"
	channel.QueueDeclare(queueNameCar, false, false, false, false, nil)
	channel.QueueDeclare(queueNameBigCar, false, false, false, false, nil)
	channel.QueueDeclare(queueNameMiddleCar, false, false, false, false, nil)
	channel.QueueDeclare(queueNameSmallCar, false, false, false, false, nil)

	if err := channel.QueueBind(queueNameCar, "car", topicExchangeNameCar, false, nil); err != nil {
        handlerError(err, "Failed to bind queue to exchange")
    }
    if err := channel.QueueBind(queueNameBigCar, "car", topicExchangeNameCar, false, nil); err != nil {
        handlerError(err, "Failed to bind queue to exchange")
    }
    if err := channel.QueueBind(queueNameBigCar, "big.car", topicExchangeNameCar, false, nil); err != nil {
        handlerError(err, "Failed to bind queue to exchange")
    }

    if err := channel.QueueBind(queueNameMiddleCar, "car", topicExchangeNameCar, false, nil); err != nil {
        handlerError(err, "Failed to bind queue to exchange")
    }
    if err := channel.QueueBind(queueNameMiddleCar, "middler.car", topicExchangeNameCar, false, nil); err != nil {
        handlerError(err, "Failed to bind queue to exchange")
    }

    if err := channel.QueueBind(queueNameSmallCar, "car", topicExchangeNameCar, false, nil); err != nil {
        handlerError(err, "Failed to bind queue to exchange")
    }
    if err := channel.QueueBind(queueNameSmallCar, "small.car", topicExchangeNameCar, false, nil); err != nil {
        handlerError(err, "Failed to bind queue to exchange")
    }
    if err := channel.QueueBind(queueNameSmallCar, "*.small.car", topicExchangeNameCar, false, nil); err != nil {
        handlerError(err, "Failed to bind queue to exchange")
    }
    if err := channel.QueueBind(queueNameSmallCar, "#.small.car", topicExchangeNameCar, false, nil); err != nil {
        handlerError(err, "Failed to bind queue to exchange")
    }
}
producerqueue
	if err := channel.Publish(topicExchangeNameCar, "car", false, false, amqp.Publishing{ContentType: "text/plain", Body: []byte("test car")}); err != nil {
		handlerError(err, "Failed to publish message")
	}
  • 每个 queue 都会收到消息

	if err := channel.Publish(topicExchangeNameCar, "small.car", false, false, amqp.Publishing{ContentType: "text/plain", Body: []byte("test car")}); err != nil {
		handlerError(err, "Failed to publish message")
	}
small-carsmall.car*.small.car#.small.car
	if err := channel.Publish(topicExchangeNameCar, "benz.small.car", false, false, amqp.Publishing{ContentType: "text/plain", Body: []byte("test car")}); err != nil {
		handlerError(err, "Failed to publish message")
	}
small-car*.small.car#.small.car
	if err := channel.Publish(topicExchangeNameCar, "auto.blue.benz.small.car", false, false, amqp.Publishing{ContentType: "text/plain", Body: []byte("test car")}); err != nil {
		handlerError(err, "Failed to publish message")
	}
small-car#.small.car
	if err := channel.Publish(topicExchangeNameCar, "bike", false, false, amqp.Publishing{ContentType: "text/plain", Body: []byte("test car")}); err != nil {
		handlerError(err, "Failed to publish message")
	}
  • 都不会收到消息,没有符合的 routekey 。

headers

这种类型很少有实际的应用场景。

感谢各位的阅读,以上就是“消息队列原理之如何掌握rabbitmq”的内容了,经过本文的学习后,相信大家对消息队列原理之如何掌握rabbitmq这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是亿速云,小编将为大家推送更多相关知识点的文章,欢迎关注!