目录
前提
采用RabbitMQ官方延迟插件,实现延时队列
交换机类型必须采用 x-delayed-message
必须安装官方延长队列插件(x-delayed-message)
集群 安装插件
若集群安装插件节点类型必须采用disc
废话不多说直接上干货
代码如下
生产者
/**
* @Description: 生产者
* @File: producter
* @Version: 1.0.0
* @Date: 2022/4/27 8:37
*/
package main
import (
"github.com/streadway/amqp"
"log"
"time"
)
func main() {
conn, err := amqp.Dial("amqp://root:123456@127.0.0.1:5672/test")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
var (
exchange = "x-delayed-message"
queue = "delay_queue"
routingKey = "log_delay"
body string
)
// 申请交换机
err = ch.ExchangeDeclare(exchange, exchange, true, false, false, false, amqp.Table{
"x-delayed-type": "direct",
})
if err != nil {
failOnError(err, "交换机申请失败!")
return
}
if err = ch.QueueBind(queue, routingKey, exchange, false, nil); err != nil {
failOnError(err, "绑定交换机失败!")
return
}
body = "==========10000=================" + time.Now().Local().Format("2006-01-02 15:04:05")
// 将消息发送到延时队列上
err = ch.Publish(
exchange, // exchange 这里为空则不选择 exchange
routingKey, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
Headers: map[string]interface{}{
"x-delay": "10000", // 消息从交换机过期时间,毫秒(x-dead-message插件提供)
},
})
failOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent %s", body)
body = "==========20000=================" + time.Now().Local().Format("2006-01-02 15:04:05")
// 将消息发送到延时队列上
err = ch.Publish(
exchange, // exchange 这里为空则不选择 exchange
routingKey, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
Headers: map[string]interface{}{
"x-delay": "20000", // 消息从交换机过期时间,毫秒(x-dead-message插件提供)
},
})
failOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent %s", body)
body = "==========5000=================" + time.Now().Local().Format("2006-01-02 15:04:05")
// 将消息发送到延时队列上
err = ch.Publish(
exchange, // exchange 这里为空则不选择 exchange
routingKey, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
Headers: map[string]interface{}{
"x-delay": "5000", // 消息从交换机过期时间,毫秒(x-dead-message插件提供)
},
})
failOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent %s", body)
}
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
消费者
/**
* @Description: 消费者
* @File: consumer
* @Version: 1.0.0
* @Date: 2022/4/27 8:38
*/
package main
import (
"github.com/streadway/amqp"
"log"
)
// amqp://root:ty123456@172.16.50.136:5672/lidong
func main() {
// 建立链接
conn, err := amqp.Dial("amqp://root:123456@127.0.0.1:5672/test")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
var (
exchange = "x-delayed-message"
queue = "delay_queue"
routingKey = "log_delay"
)
// 申请交换机
err = ch.ExchangeDeclare(exchange, exchange, true, false, false, false, amqp.Table{
"x-delayed-type": "direct",
})
if err != nil {
failOnError(err, "交换机申请失败!")
return
}
// 声明一个常规的队列, 其实这个也没必要声明,因为 exchange 会默认绑定一个队列
q, err := ch.QueueDeclare(
queue, // name
true, // durable
true, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
err = ch.QueueBind(
q.Name, // queue name, 这里指的是 test_logs
routingKey, // routing key
exchange, // exchange
false,
nil)
failOnError(err, "Failed to bind a queue")
// 这里监听的是 test_logs
msgs, err := ch.Consume(
q.Name, // queue name, 这里指的是 test_logs
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
forever := make(chan bool)
go func() {
for d := range msgs {
log.Printf("接收数据 [x] %s", d.Body)
}
}()
log.Printf(" [*] Waiting for logs. To exit press CTRL+C")
<-forever
}
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}