目录

前提

采用RabbitMQ官方延迟插件,实现延时队列

交换机类型必须采用  x-delayed-message

必须安装官方延长队列插件(x-delayed-message)

集群  安装插件

   若集群安装插件节点类型必须采用disc

废话不多说直接上干货

代码如下

生产者

/**
* @Description: 生产者
* @File: producter
* @Version: 1.0.0
* @Date: 2022/4/27 8:37
 */

package main

import (
	"github.com/streadway/amqp"
	"log"
	"time"
)

func main() {
	conn, err := amqp.Dial("amqp://root:123456@127.0.0.1:5672/test")
	failOnError(err, "Failed to connect to RabbitMQ")
	defer conn.Close()

	ch, err := conn.Channel()
	failOnError(err, "Failed to open a channel")
	defer ch.Close()

	var (
		exchange   = "x-delayed-message"
		queue      = "delay_queue"
		routingKey = "log_delay"
		body       string
	)
	// 申请交换机
	err = ch.ExchangeDeclare(exchange, exchange, true, false, false, false, amqp.Table{
		"x-delayed-type": "direct",
	})
	if err != nil {
		failOnError(err, "交换机申请失败!")
		return
	}
	if err = ch.QueueBind(queue, routingKey, exchange, false, nil); err != nil {
		failOnError(err, "绑定交换机失败!")
		return
	}

	body = "==========10000=================" + time.Now().Local().Format("2006-01-02 15:04:05")
	// 将消息发送到延时队列上
	err = ch.Publish(
		exchange,   // exchange 这里为空则不选择 exchange
		routingKey, // routing key
		false,      // mandatory
		false,      // immediate
		amqp.Publishing{
			ContentType: "text/plain",
			Body:        []byte(body),
			Headers: map[string]interface{}{
				"x-delay": "10000", // 消息从交换机过期时间,毫秒(x-dead-message插件提供)
			},
		})
	failOnError(err, "Failed to publish a message")
	log.Printf(" [x] Sent %s", body)

	body = "==========20000=================" + time.Now().Local().Format("2006-01-02 15:04:05")
	// 将消息发送到延时队列上
	err = ch.Publish(
		exchange,   // exchange 这里为空则不选择 exchange
		routingKey, // routing key
		false,      // mandatory
		false,      // immediate
		amqp.Publishing{
			ContentType: "text/plain",
			Body:        []byte(body),
			Headers: map[string]interface{}{
				"x-delay": "20000", // 消息从交换机过期时间,毫秒(x-dead-message插件提供)
			},
		})
	failOnError(err, "Failed to publish a message")
	log.Printf(" [x] Sent %s", body)

	body = "==========5000=================" + time.Now().Local().Format("2006-01-02 15:04:05")
	// 将消息发送到延时队列上
	err = ch.Publish(
		exchange,   // exchange 这里为空则不选择 exchange
		routingKey, // routing key
		false,      // mandatory
		false,      // immediate
		amqp.Publishing{
			ContentType: "text/plain",
			Body:        []byte(body),
			Headers: map[string]interface{}{
				"x-delay": "5000", // 消息从交换机过期时间,毫秒(x-dead-message插件提供)
			},
		})
	failOnError(err, "Failed to publish a message")
	log.Printf(" [x] Sent %s", body)
}

func failOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}

消费者

/**
* @Description: 消费者
* @File: consumer
* @Version: 1.0.0
* @Date: 2022/4/27 8:38
 */

package main

import (
	"github.com/streadway/amqp"
	"log"
)

// amqp://root:ty123456@172.16.50.136:5672/lidong
func main() {
	// 建立链接
	conn, err := amqp.Dial("amqp://root:123456@127.0.0.1:5672/test")
	failOnError(err, "Failed to connect to RabbitMQ")
	defer conn.Close()

	ch, err := conn.Channel()
	failOnError(err, "Failed to open a channel")
	defer ch.Close()

	var (
		exchange   = "x-delayed-message"
		queue      = "delay_queue"
		routingKey = "log_delay"
	)

	// 申请交换机
	err = ch.ExchangeDeclare(exchange, exchange, true, false, false, false, amqp.Table{
		"x-delayed-type": "direct",
	})
	if err != nil {
		failOnError(err, "交换机申请失败!")
		return
	}

	// 声明一个常规的队列, 其实这个也没必要声明,因为 exchange 会默认绑定一个队列
	q, err := ch.QueueDeclare(
		queue, // name
		true,  // durable
		true,  // delete when unused
		false, // exclusive
		false, // no-wait
		nil,   // arguments
	)
	failOnError(err, "Failed to declare a queue")
	err = ch.QueueBind(
		q.Name,     // queue name, 这里指的是 test_logs
		routingKey, // routing key
		exchange,   // exchange
		false,
		nil)
	failOnError(err, "Failed to bind a queue")

	// 这里监听的是 test_logs
	msgs, err := ch.Consume(
		q.Name, // queue name, 这里指的是 test_logs
		"",     // consumer
		true,   // auto-ack
		false,  // exclusive
		false,  // no-local
		false,  // no-wait
		nil,    // args
	)
	failOnError(err, "Failed to register a consumer")

	forever := make(chan bool)

	go func() {
		for d := range msgs {
			log.Printf("接收数据 [x] %s", d.Body)
		}
	}()

	log.Printf(" [*] Waiting for logs. To exit press CTRL+C")
	<-forever
}

func failOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}

交换机截图

 队列截图