什么是发布/订阅?
创建一个队列每个任务只传递给一个工人,做些不同的事,向多个消费者传递一个消息。这就是所谓的“订阅/发布模式”。
构建一个简单的日志系统。它将由两个程序组成——第一个程序将发出日志消息,第二个程序将接收并打印它们。
已发布的日志消息将被广播到所有接收者。
Exchanges(交换器)
RabbitMQ消息传递模型中的核心思想是生产者从不将任何消息直接发送到队列。实际上,生产者经常甚至根本不知道是否将消息传递到任何队列。
相反,生产者只能将消息发送到交换器。交换器是非常简单的东西。一方面,它接收来自生产者的消息,另一方面,将它们推入队列。交换器必须确切知道如何处理接收到的消息。它应该被附加到特定的队列吗?还是应该将其附加到许多队列中?或者它应该被丢弃。这些规则由交换器的类型定义。
有几种交换器类型可用:direct, topic, headers 和 fanout。
Direct Exchange
处理路由键。需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。这是一个完整的匹配。如果一个队列绑定到该交换机上要求路由键 “abc”,则只有被标记为“abc”的消息才被转发,不会转发abc.def,也不会转发dog.ghi,只会转发abc。
Fanout Exchange
不处理路由键。你只需要简单的将队列绑定到交换机上。一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。Fanout交换机转发消息是最快的。
Topic Exchange
将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。符号“#”匹配一个或多个词,符号“”匹配不多不少一个词。因此“abc.#”能够匹配到“abc.def.ghi”,但是“abc.” 只会匹配到“abc.def”。
Headers Exchanges
不处理路由键。而是根据发送的消息内容中的headers属性进行匹配。在绑定Queue与Exchange时指定一组键值对;当消息发送到RabbitMQ时会取到该消息的headers与Exchange绑定时指定的键值对进行匹配;如果完全匹配则消息会路由到该队列,否则不会路由到该队列。headers属性是一个键值对,可以是Hashtable,键值对的值可以是任何类型。而fanout,direct,topic 的路由键都需要要字符串形式的。
匹配规则x-match有下列两种类型:
x-match = all :表示所有的键值对都匹配才能接受到消息
x-match = any :表示只要有键值对匹配就能接受到消息
交换器清单
sudo rabbitmqctl list_exchanges
临时队列
当我们连接到Rabbit时,我们需要一个新的、空的队列。为此,我们可以创建一个随机名称的队列,或者更好的方法是让服务器为我们选择一个随机队列名称。
其次,一旦我们断开消费者的连接,队列就会自动删除。
在amqp客户端中,当我们传递一个空字符串作为队列名称时,我们将使用随机生成的名称创建一个非持久队列。
当声明它的连接关闭时,该队列将被删除,因为它被声明为独占。
绑定
我们已经创建了一个扇出交换器和一个队列。现在我们需要告诉交换器将消息发送到我们的队列。交换器和队列之间的关系称为*绑定*。
列出绑定关系
rabbitmqctl list_bindings
发布日志的编写 send_logs.go
package mainimport ( "github.com/streadway/amqp" "log" "os" "strings")// 记录错误日志func failOnLogsError(err error,msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) }}func main() { conn,err := amqp.Dial("amqp://xxxxx:xxxxx@xx.xxx.xxx.xxx:xxxx/") failOnLogsError(err,"Failed to connect to RabbitMQ") defer conn.Close() ch,err := conn.Channel() failOnLogsError(err,"Failed to open a channel") defer ch.Close() err = ch.ExchangeDeclare( "logs", // name "fanout", // type true, // durable false, // auto-deleted false, // internal false, // no-wait nil, // arguments ) failOnLogsError(err,"Failed to declare an exchange") body := bodyLogsFrom(os.Args) err = ch.Publish( "logs", // exchange "", // routing key false, // mandatory false, // immediate amqp.Publishing{ ContentType: "text/plain", Body: []byte(body), }) failOnLogsError(err,"Failed to publish a message") log.Printf(" [x] Sent %s", body)}func bodyLogsFrom(args []string) string { var s string if (len(args) < 2) || os.Args[1] == "" { s = "hello" } else { s = strings.Join(args[1:], " ") } return s}
接收日志的编写 recetive_logs.go
package mainimport ( "github.com/streadway/amqp" "log")// 记录错误日志func failOnRecetiveLogsError(err error,msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) }}func main() { conn,err := amqp.Dial("amqp://xxxxx:xxxxx@xx.xxx.xxx.xxx:xxxx/") failOnRecetiveLogsError(err,"Failed to connect to RabbitMQ") defer conn.Close() ch,err := conn.Channel() failOnRecetiveLogsError(err,"Failed to open a channel") defer ch.Close() err = ch.ExchangeDeclare( "logs", // name "fanout", // type true, // durable false, // auto-deleted false, // internal false, // no-wait nil, // arguments ) failOnRecetiveLogsError(err,"Failed to declare an exchange") q,err := ch.QueueDeclare( "", // name false, // durable false, // delete when unused true, // exclusive false, // no-wait nil, // arguments ) failOnRecetiveLogsError(err,"Failed to declare a queue") err = ch.QueueBind( q.Name, // queue name "", // routing key "logs", // exchange false, nil, ) failOnRecetiveLogsError(err,"Failed to bind a queue") msgs,err := ch.Consume( q.Name, // queue "", // consumer true, // auto-ack false, // exclusive false, // no-local false, // no-wait nil, // args ) failOnRecetiveLogsError(err,"Failed to register a consumer") forever := make(chan bool) go func() { for d := range msgs { log.Printf(" [x] %s", d.Body) } }() log.Printf(" [*] Waiting for logs. To exit press CTRL+C") <-forever}
go run receive_logs.go > logs_from_rabbit.log
go run receive_logs.go
go run send_logs.go
sudo rabbitmqctl list_bindings