交换机详解
Send MessageReceive MessageRabbitMQ ServerRoutingKey
交换机类型
一般有这四种:
- Direct:直连方式,相当于一种点对点的消息投递,如果路由键匹配,就直接投递到相应的队列
- Fanout: 广播方式,投递到此类型交换机的消息将下发到所有绑定的队列
- Topic:提供一种模式匹配的投递方式,我们可以根据主题来决定消息投递到哪个队列
- Headers
Headers Exchange
交换机属性
Name:
Type:
Durability:
Auto Delete:
Internal:
Arguments:
注意
(AMQP default)autoDeleteexclusiveautoDeleteexclusive
Direct Exchange
Direct ExchangeRouteKey必须完全匹配
Direct Exchange
生产端
指定投递的Exchange和相应的RontingKey进行发送消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
package main
import (
"github.com/streadway/amqp"
"log"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
// 1. 建立RabbitMQ连接
conn, err := amqp.Dial("amqp://guest:guest@localhost:/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
// 2. 创建channel
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
// 3. 声明exchange,routing key
exchange := "test_direct_exchange"
routingKey := "test.direct"
// 4. 声明(创建)一个交换机
//name:交换器的名称。
//kind:也叫作type,表示交换器的类型。有四种常用类型:direct、fanout、topic、headers。
//durable:是否持久化,true表示是。持久化表示会把交换器的配置存盘,当RMQ Server重启后,会自动加载交换器。
//autoDelete:是否自动删除,true表示是。至少有一条绑定才可以触发自动删除,当所有绑定都与交换器解绑后,会自动删除此交换器。
//internal:是否为内部,true表示是。客户端无法直接发送msg到内部交换器,只有交换器可以发送msg到内部交换器。
//noWait:是否非阻塞,true表示是。阻塞:表示创建交换器的请求发送后,阻塞等待RMQ Server返回信息。非阻塞:不会阻塞等待RMQ Server的返回信息,而RMQ Server也不会返回信息。(不推荐使用)
//args:直接写nil,没研究过,不解释。
//注意,在生产者里声不声明(创建)交换机都可以。这里声明,是为了防止消费者没有启动或者这个交换机原先不存在,导致消息投递丢失。
err = ch.ExchangeDeclare(
exchange, // name
"direct", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare an exchange")
msg := "Hello World RabbitMQ Direct Exchange Message ... "
// 5. 发送消息
//exchange:要发送到的交换机名称,对应图中exchangeName。
//key:路由键,对应图中RoutingKey。
//mandatory:消息发布的时候设置消息的 mandatory 属性用于设置消息在发送到交换器之后无法路由到队列的情况对消息的处理方式, 设置为 true 表示将消息返回到生产者,否则直接丢弃消息。直接false,不建议使用。
//immediate :参数告诉服务器至少将该消息路由到一个队列中,否则将消息返回给生产者。immediate参数告诉服务器,如果该消息关联的队列上有消费者,则立刻投递:如果所有匹配的队列上都没有消费者,则直接将消息返还给生产者,不用将消息存入队列而等待消费者了。直接false,不建议使用。RabbitMQ 3.0版本开始去掉了对immediate参数的支持。
//msg:要发送的消息,msg对应一个Publishing结构,Publishing结构里面有很多参数,这里只强调几个参数,其他参数暂时列出,但不解释。
err = ch.Publish(
exchange, // exchange
routingKey, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(msg),
})
failOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent %s", msg)
}
消费端
- 声明一个直连交换机
- 声明队列
- 建立交换机和队列的绑定关系
- 消费者监听队列,消费消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
package main
import (
"github.com/streadway/amqp"
"log"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
// 1. 建立RabbitMQ连接
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
// 2. 创建channel
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
// 3. 声明exchange,routing key,queue name
exchange := "test_direct_exchange"
routingKey := "test.direct"
queueName := "test_direct_queue"
// 4. 声明(创建)一个交换机
//name:交换器的名称。
//kind:也叫作type,表示交换器的类型。有四种常用类型:direct、fanout、topic、headers。
//durable:是否持久化,true表示是。持久化表示会把交换器的配置存盘,当RMQ Server重启后,会自动加载交换器。
//autoDelete:是否自动删除,true表示是。至少有一条绑定才可以触发自动删除,当所有绑定都与交换器解绑后,会自动删除此交换器。
//internal:是否为内部,true表示是。客户端无法直接发送msg到内部交换器,只有交换器可以发送msg到内部交换器。
//noWait:是否非阻塞,true表示是。阻塞:表示创建交换器的请求发送后,阻塞等待RMQ Server返回信息。非阻塞:不会阻塞等待RMQ Server的返回信息,而RMQ Server也不会返回信息。(不推荐使用)
//args:直接写nil,没研究过,不解释。
err = ch.ExchangeDeclare(
exchange, // name
"direct", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare an exchange")
// 5. 声明(创建)一个队列
//name:队列名称
//durable:是否持久化,true为是。持久化会把队列存盘,服务器重启后,不会丢失队列以及队列内的信息。(注:1、不丢失是相对的,如果宕机时有消息没来得及存盘,还是会丢失的。2、存盘影响性能。)
//autoDelete:是否自动删除,true为是。至少有一个消费者连接到队列时才可以触发。当所有消费者都断开时,队列会自动删除。
//exclusive:是否设置排他,true为是。如果设置为排他,则队列仅对首次声明他的连接可见,并在连接断开时自动删除。(注意,这里说的是连接不是信道,相同连接不同信道是可见的)。
//nowait:是否非阻塞,true表示是。阻塞:表示创建交换器的请求发送后,阻塞等待RMQ Server返回信息。非阻塞:不会阻塞等待RMQ Server的返回信息,而RMQ Server也不会返回信息。(不推荐使用)
//args:直接写nil,没研究过,不解释。
q, err := ch.QueueDeclare(
queueName, // name
false, // durable
false, // delete when usused
true, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
// 6. 队列绑定
//name:队列名称
//key:BandingKey,表示要绑定的键。
//exchange:交换器名称
//nowait:是否非阻塞,true表示是。阻塞:表示创建交换器的请求发送后,阻塞等待RMQ Server返回信息。非阻塞:不会阻塞等待RMQ Server的返回信息,而RMQ Server也不会返回信息。(不推荐使用)
//args:直接写nil,没研究过,不解释。
err = ch.QueueBind(
q.Name, // queue name
routingKey, // routing key
exchange, // exchange
false,
nil)
failOnError(err, "Failed to bind a queue")
// 7. RMQ Server主动把消息推给消费者
//queue:队列名称。
//consumer:消费者标签,用于区分不同的消费者。
//autoAck:是否自动回复ACK,true为是,回复ACK表示告诉服务器我收到消息了。建议为false,手动回复,这样可控性强。
//exclusive:设置是否排他,排他表示当前队列只能给一个消费者使用。
//noLocal:如果为true,表示生产者和消费者不能是同一个connect。
//nowait:是否非阻塞,true表示是。阻塞:表示创建交换器的请求发送后,阻塞等待RMQ Server返回信息。非阻塞:不会阻塞等待RMQ Server的返回信息,而RMQ Server也不会返回信息。(不推荐使用)
//args:直接写nil,没研究过,不解释。
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto ack
false, // exclusive
false, // no local
false, // no wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
forever := make(chan bool)
go func() {
for d := range msgs {
log.Printf(" [x] %s", d.Body)
}
}()
log.Printf(" [*] Waiting for logs. To exit press CTRL+C")
<-forever
}
运行说明
先启动消费端,刷新管控台,在Exchange目录下可以看到我们声明的exchange以及type
点击该exchange可以看到和队列的绑定关系
然后启动生产端,此时消费端控制台进行了打印,共消费了两条消息,说明监听的两个队列都接收到了消息。
1
Hello World RabbitMQ Direct Exchange Message ...
如果修改值为:test.direct111,此时在启动生产端,消费端就收不到消息了,这就是直连的方式。
补充
一个交换机可以有多个routing key。一个队列也可以被多个rouing key所绑定,一个routing也可以绑定多个队列。例如,当一个routing key绑定2个队列时,此时如果有生产者投递一个消息到该交换机的该routing key,此时2个队列都会存入这个消息。
一个队列多个routing key
binding keyrouting key
下图能够很好的描述这个场景:
directorangeblackgreen
orangeblackgreen
一个routing key多个队列
也称之为多个绑定(Multiple bindings)
blackdirectfanoutblack
Topic Exchange
Topic Exchange
注意:可以使用通配符进行模糊匹配
1
2
3
4
符号 "#" 匹配一个或多个词
符号 "*" 匹配不多不少一个词
例如: "log.#" 能够匹配到 "log.info.aa"
"log.*" 只会匹配到 "log.erro"
图解
Topic Exchangeusa.##.news#.weathereurope.#
usa.usa.news
生产端
指定投递的Exchange和相应的RontingKey进行发送消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
package main
import (
"github.com/streadway/amqp"
"log"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
// 1. 建立RabbitMQ连接
conn, err := amqp.Dial("amqp://guest:guest@localhost:/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
// 2. 创建channel
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
// 3. 声明exchange,routing key
exchange := "test_topic_exchange"
routingKey1 := "user.save"
routingKey2 := "user.update"
routingKey3 := "user.delete.abc"
// 4. 声明(创建)一个交换机
//name:交换器的名称。
//kind:也叫作type,表示交换器的类型。有四种常用类型:direct、fanout、topic、headers。
//durable:是否持久化,true表示是。持久化表示会把交换器的配置存盘,当RMQ Server重启后,会自动加载交换器。
//autoDelete:是否自动删除,true表示是。至少有一条绑定才可以触发自动删除,当所有绑定都与交换器解绑后,会自动删除此交换器。
//internal:是否为内部,true表示是。客户端无法直接发送msg到内部交换器,只有交换器可以发送msg到内部交换器。
//noWait:是否非阻塞,true表示是。阻塞:表示创建交换器的请求发送后,阻塞等待RMQ Server返回信息。非阻塞:不会阻塞等待RMQ Server的返回信息,而RMQ Server也不会返回信息。(不推荐使用)
//args:直接写nil,没研究过,不解释。
//注意,在生产者里声不声明(创建)交换机都可以。这里声明,是为了防止消费者没有启动或者这个交换机原先不存在,导致消息投递丢失。
err = ch.ExchangeDeclare(
exchange, // name
"topic", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare an exchange")
msg := "Hello World RabbitMQ Topic Exchange Message ... "
// 5. 发送消息
//exchange:要发送到的交换机名称,对应图中exchangeName。
//key:路由键,对应图中RoutingKey。
//mandatory:消息发布的时候设置消息的 mandatory 属性用于设置消息在发送到交换器之后无法路由到队列的情况对消息的处理方式, 设置为 true 表示将消息返回到生产者,否则直接丢弃消息。直接false,不建议使用。
//immediate :参数告诉服务器至少将该消息路由到一个队列中,否则将消息返回给生产者。immediate参数告诉服务器,如果该消息关联的队列上有消费者,则立刻投递:如果所有匹配的队列上都没有消费者,则直接将消息返还给生产者,不用将消息存入队列而等待消费者了。直接false,不建议使用。RabbitMQ 3.0版本开始去掉了对immediate参数的支持。
//msg:要发送的消息,msg对应一个Publishing结构,Publishing结构里面有很多参数,这里只强调几个参数,其他参数暂时列出,但不解释。
err = ch.Publish(
exchange, // exchange
routingKey1, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(msg),
})
err = ch.Publish(
exchange, // exchange
routingKey2, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(msg),
})
err = ch.Publish(
exchange, // exchange
routingKey3, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(msg),
})
failOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent %s", msg)
}
消费端
Topic Exchange
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
package main
import (
"github.com/streadway/amqp"
"log"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
// 1. 建立RabbitMQ连接
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
// 2. 创建channel
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
// 3. 声明exchange,routing key,queue name
exchange := "test_topic_exchange"
routingKey := "user.#"
//routingKey := "user.*"
queueName := "test_topic_queue"
// 4. 声明(创建)一个交换机
//name:交换器的名称。
//kind:也叫作type,表示交换器的类型。有四种常用类型:direct、fanout、topic、headers。
//durable:是否持久化,true表示是。持久化表示会把交换器的配置存盘,当RMQ Server重启后,会自动加载交换器。
//autoDelete:是否自动删除,true表示是。至少有一条绑定才可以触发自动删除,当所有绑定都与交换器解绑后,会自动删除此交换器。
//internal:是否为内部,true表示是。客户端无法直接发送msg到内部交换器,只有交换器可以发送msg到内部交换器。
//noWait:是否非阻塞,true表示是。阻塞:表示创建交换器的请求发送后,阻塞等待RMQ Server返回信息。非阻塞:不会阻塞等待RMQ Server的返回信息,而RMQ Server也不会返回信息。(不推荐使用)
//args:直接写nil,没研究过,不解释。
err = ch.ExchangeDeclare(
exchange, // name
"topic", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare an exchange")
// 5. 声明(创建)一个队列
//name:队列名称
//durable:是否持久化,true为是。持久化会把队列存盘,服务器重启后,不会丢失队列以及队列内的信息。(注:1、不丢失是相对的,如果宕机时有消息没来得及存盘,还是会丢失的。2、存盘影响性能。)
//autoDelete:是否自动删除,true为是。至少有一个消费者连接到队列时才可以触发。当所有消费者都断开时,队列会自动删除。
//exclusive:是否设置排他,true为是。如果设置为排他,则队列仅对首次声明他的连接可见,并在连接断开时自动删除。(注意,这里说的是连接不是信道,相同连接不同信道是可见的)。
//nowait:是否非阻塞,true表示是。阻塞:表示创建交换器的请求发送后,阻塞等待RMQ Server返回信息。非阻塞:不会阻塞等待RMQ Server的返回信息,而RMQ Server也不会返回信息。(不推荐使用)
//args:直接写nil,没研究过,不解释。
q, err := ch.QueueDeclare(
queueName, // name
false, // durable
false, // delete when usused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
// 6. 队列绑定
//name:队列名称
//key:BandingKey,表示要绑定的键。
//exchange:交换器名称
//nowait:是否非阻塞,true表示是。阻塞:表示创建交换器的请求发送后,阻塞等待RMQ Server返回信息。非阻塞:不会阻塞等待RMQ Server的返回信息,而RMQ Server也不会返回信息。(不推荐使用)
//args:直接写nil,没研究过,不解释。
err = ch.QueueBind(
q.Name, // queue name
routingKey, // routing key
exchange, // exchange
false,
nil)
failOnError(err, "Failed to bind a queue")
// 7. RMQ Server主动把消息推给消费者
//queue:队列名称。
//consumer:消费者标签,用于区分不同的消费者。
//autoAck:是否自动回复ACK,true为是,回复ACK表示告诉服务器我收到消息了。建议为false,手动回复,这样可控性强。
//exclusive:设置是否排他,排他表示当前队列只能给一个消费者使用。
//noLocal:如果为true,表示生产者和消费者不能是同一个connect。
//nowait:是否非阻塞,true表示是。阻塞:表示创建交换器的请求发送后,阻塞等待RMQ Server返回信息。非阻塞:不会阻塞等待RMQ Server的返回信息,而RMQ Server也不会返回信息。(不推荐使用)
//args:直接写nil,没研究过,不解释。
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto ack
false, // exclusive
false, // no local
false, // no wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
forever := make(chan bool)
go func() {
for d := range msgs {
log.Printf(" [x] %s", d.Body)
}
}()
log.Printf(" [*] Waiting for logs. To exit press CTRL+C")
<-forever
}
运行说明
#
1
2
3
Hello World RabbitMQ Topic Exchange Message ...
Hello World RabbitMQ Topic Exchange Message ...
Hello World RabbitMQ Topic Exchange Message ...
routingKey = "user.*";Unbind
user.delete.abc*Topic Exchange
Fanout Exchange
- 不处理路由键,只需要简单的将队列绑定到交换机上
- 发送到交换机的消息都会被转发到与该交换机绑定的所有队列上
- Fanout交换机转发消息是最快的(性能最好)
Fanout Exchange
生产端
Fanout Exchange
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
package main
import (
"github.com/streadway/amqp"
"log"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
// 1. 建立RabbitMQ连接
conn, err := amqp.Dial("amqp://guest:guest@localhost:/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
// 2. 创建channel
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
// 3. 声明exchange,routing key
exchange := "test_fanout_exchange"
routingKey := "asdad" // 随便写的
// 4. 声明(创建)一个交换机
//name:交换器的名称。
//kind:也叫作type,表示交换器的类型。有四种常用类型:direct、fanout、topic、headers。
//durable:是否持久化,true表示是。持久化表示会把交换器的配置存盘,当RMQ Server重启后,会自动加载交换器。
//autoDelete:是否自动删除,true表示是。至少有一条绑定才可以触发自动删除,当所有绑定都与交换器解绑后,会自动删除此交换器。
//internal:是否为内部,true表示是。客户端无法直接发送msg到内部交换器,只有交换器可以发送msg到内部交换器。
//noWait:是否非阻塞,true表示是。阻塞:表示创建交换器的请求发送后,阻塞等待RMQ Server返回信息。非阻塞:不会阻塞等待RMQ Server的返回信息,而RMQ Server也不会返回信息。(不推荐使用)
//args:直接写nil,没研究过,不解释。
//注意,在生产者里声不声明(创建)交换机都可以。这里声明,是为了防止消费者没有启动或者这个交换机原先不存在,导致消息投递丢失。
err = ch.ExchangeDeclare(
exchange, // name
"fanout", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare an exchange")
msg := "Hello World RabbitMQ Fanout Exchange Message ... "
// 5. 发送消息
//exchange:要发送到的交换机名称,对应图中exchangeName。
//key:路由键,对应图中RoutingKey。
//mandatory:消息发布的时候设置消息的 mandatory 属性用于设置消息在发送到交换器之后无法路由到队列的情况对消息的处理方式, 设置为 true 表示将消息返回到生产者,否则直接丢弃消息。直接false,不建议使用。
//immediate :参数告诉服务器至少将该消息路由到一个队列中,否则将消息返回给生产者。immediate参数告诉服务器,如果该消息关联的队列上有消费者,则立刻投递:如果所有匹配的队列上都没有消费者,则直接将消息返还给生产者,不用将消息存入队列而等待消费者了。直接false,不建议使用。RabbitMQ 3.0版本开始去掉了对immediate参数的支持。
//msg:要发送的消息,msg对应一个Publishing结构,Publishing结构里面有很多参数,这里只强调几个参数,其他参数暂时列出,但不解释。
err = ch.Publish(
exchange, // exchange
routingKey, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(msg),
})
failOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent %s", msg)
}
消费端
Fanout Exchange
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
package main
import (
"github.com/streadway/amqp"
"log"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
// 1. 建立RabbitMQ连接
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
// 2. 创建channel
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
// 3. 声明exchange,routing key,queue name
exchange := "test_fanout_exchange"
routingKey := "" //不需要设置routing key
queueName := "test_fanout_queue"
// 4. 声明(创建)一个交换机
//name:交换器的名称。
//kind:也叫作type,表示交换器的类型。有四种常用类型:direct、fanout、topic、headers。
//durable:是否持久化,true表示是。持久化表示会把交换器的配置存盘,当RMQ Server重启后,会自动加载交换器。
//autoDelete:是否自动删除,true表示是。至少有一条绑定才可以触发自动删除,当所有绑定都与交换器解绑后,会自动删除此交换器。
//internal:是否为内部,true表示是。客户端无法直接发送msg到内部交换器,只有交换器可以发送msg到内部交换器。
//noWait:是否非阻塞,true表示是。阻塞:表示创建交换器的请求发送后,阻塞等待RMQ Server返回信息。非阻塞:不会阻塞等待RMQ Server的返回信息,而RMQ Server也不会返回信息。(不推荐使用)
//args:直接写nil,没研究过,不解释。
err = ch.ExchangeDeclare(
exchange, // name
"fanout", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare an exchange")
// 5. 声明(创建)一个队列
//name:队列名称
//durable:是否持久化,true为是。持久化会把队列存盘,服务器重启后,不会丢失队列以及队列内的信息。(注:1、不丢失是相对的,如果宕机时有消息没来得及存盘,还是会丢失的。2、存盘影响性能。)
//autoDelete:是否自动删除,true为是。至少有一个消费者连接到队列时才可以触发。当所有消费者都断开时,队列会自动删除。
//exclusive:是否设置排他,true为是。如果设置为排他,则队列仅对首次声明他的连接可见,并在连接断开时自动删除。(注意,这里说的是连接不是信道,相同连接不同信道是可见的)。
//nowait:是否非阻塞,true表示是。阻塞:表示创建交换器的请求发送后,阻塞等待RMQ Server返回信息。非阻塞:不会阻塞等待RMQ Server的返回信息,而RMQ Server也不会返回信息。(不推荐使用)
//args:直接写nil,没研究过,不解释。
q, err := ch.QueueDeclare(
queueName, // name
false, // durable
false, // delete when usused
true, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
// 6. 队列绑定
//name:队列名称
//key:BandingKey,表示要绑定的键。
//exchange:交换器名称
//nowait:是否非阻塞,true表示是。阻塞:表示创建交换器的请求发送后,阻塞等待RMQ Server返回信息。非阻塞:不会阻塞等待RMQ Server的返回信息,而RMQ Server也不会返回信息。(不推荐使用)
//args:直接写nil,没研究过,不解释。
err = ch.QueueBind(
q.Name, // queue name
routingKey, // routing key
exchange, // exchange
false,
nil)
failOnError(err, "Failed to bind a queue")
// 7. RMQ Server主动把消息推给消费者
//queue:队列名称。
//consumer:消费者标签,用于区分不同的消费者。
//autoAck:是否自动回复ACK,true为是,回复ACK表示告诉服务器我收到消息了。建议为false,手动回复,这样可控性强。
//exclusive:设置是否排他,排他表示当前队列只能给一个消费者使用。
//noLocal:如果为true,表示生产者和消费者不能是同一个connect。
//nowait:是否非阻塞,true表示是。阻塞:表示创建交换器的请求发送后,阻塞等待RMQ Server返回信息。非阻塞:不会阻塞等待RMQ Server的返回信息,而RMQ Server也不会返回信息。(不推荐使用)
//args:直接写nil,没研究过,不解释。
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto ack
false, // exclusive
false, // no local
false, // no wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
forever := make(chan bool)
go func() {
for d := range msgs {
log.Printf(" [x] %s", d.Body)
}
}()
log.Printf(" [*] Waiting for logs. To exit press CTRL+C")
<-forever
}
Fanout Exchange
1
Hello World RabbitMQ Fanout Exchange Message ...