Posted 吴冬冬
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了golang使用rabbitmqTopic模式相关的知识,希望对你有一定的参考价值。
简介directfanoutdirect
比如linux的日志系统,它不仅分级别(info/warn/crit…),而且还分来源设备(auth/cron/kern…)。
我们需要一个非常复杂度组合,比如想接受来自cron的critical日志和来自kern的所有日志。
当然topic交换器可以实现这一切。
Topic交换器.
binding key也是类似的格式,topic交换器的绑定逻辑和redict一样,根据routing_key把信息发往所有绑定了对应banding key的队列中。
*代替一个词,#代表0个和多个词
topic交换器非常强大,它可以替代其他交换器
#
*#
示例
发送者
package main
import (
"github.com/streadway/amqp"
"log"
"os"
"strings"
)
func main()
conn, err := amqp.Dial("amqp://admin:rabbitmq123@xxx.xxx.xxx.xxx:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
err = ch.ExchangeDeclare("logs_topic", amqp.ExchangeTopic, true, false, false, false, nil)
failOnError(err, "Failed to declare an exchange")
body := bodyFrom(os.Args)
err = ch.Publish("logs_topic", severityFrom(os.Args), false, false, amqp.Publishing
ContentType: "text/plain",
Body: []byte(body),
)
failOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent %s", body)
func severityFrom(args []string) string
if len(args) > 1
return args[1]
return "anonymous.info"
func bodyFrom(args []string) string
if len(args) > 2
return strings.Join(args[2:], " ")
return "hello"
func failOnError(err error, msg string)
if err != nil
log.Fatalf("%s: %s", msg, err)
接受者
package main
import (
"github.com/streadway/amqp"
"log"
"os"
)
func main()
conn, err := amqp.Dial("amqp://admin:rabbitmq123@18.232.146.30:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
err = ch.ExchangeDeclare("logs_topic", amqp.ExchangeTopic, true, false, false, false, nil)
failOnError(err, "Failed to declare an exchange")
q,err:=ch.QueueDeclare("",false,false,false,false,nil)
failOnError(err, "Failed to declare a queue")
if len(os.Args) < 2
log.Printf("Usage: %s [binding_key]...", os.Args[0])
os.Exit(0)
for _, key := range os.Args[1:]
log.Printf("Binding queue %s to exchange %s with routing key %s",
q.Name, "logs_topic", key)
err=ch.QueueBind(q.Name,key,"logs_topic",false,nil)
failOnError(err, "Failed to bind a queue")
msgs, err := ch.Consume(q.Name, "", true, false, false, false, nil)
failOnError(err, "Failed to register a consumer")
go func()
for msg := range msgs
log.Printf("Received a message: %s", msg.Body)
()
<-make(chan bool)
func failOnError(err error, msg string)
if err != nil
log.Fatalf("%s: %s", msg, err)
以上是关于golang使用rabbitmqTopic模式的主要内容,如果未能解决你的问题,请参考以下文章