目录
TTL死信队列
延迟队列
Go实现延迟队列
TTL
对消息设置过期时间。对整个队列(Queue)设置过期时间。
如何设置
x-message-ttl
expiration
如果两者都设置了过期时间,以时间短的为准。
在streadway/amqp库提供的API中设置TTL
QueueDeclareamqp.Tabletype Table map[string]interface{}
// 设置Queue ttl为5s args := amqp.Table{"x-message-ttl": 5000} q, e := ch.QueueDeclare( name, //队列名 false, true, false, false, args, //设置Queue ttl为5s )
e = q.channel.Publish( "", queue, false, false, amqp.Publishing{ // 设置当前发送消息的过期时间为3s Expiration: "3000", ReplyTo: q.Name, Body: []byte(str), })
死信队列
死信DLX
死信就是指没有被消费者消费成功的消息,一条消息变成死信有三种情况:
x-max-length
Reject()requeuefalse
x-message-ttl
如何将死信发送给DLX
为队列设置参数即可,将要发送死信的队列配置以下两个参数:
x-dead-letter-exchange: [DLX的名字]
x-dead-letter-routing-key: [DLX的routing key]
下面是死信队列的工作流程:
延迟队列
延时队列就是用来存放需要在指定时间被处理的元素的队列,通常可以用来处理一些具有过期性python操作的业务。
比如十分钟内未支付则取消订单,原先这个功能我们可以使用定时器来实现,即每隔一段时间去数据库对比未支付订单的当前时间与订单创建时间。但是定时器的时长难以确定,太长会导致订单失效时间出现误差,太短则会增大数据库压力。
实现
在RabbitMQ中没有提供延迟队列的功能,但是我们可以使用:TTL+死信队列组合的方式来实现延迟队列的效果。
下面是实现延迟队列的流程图:
func main() { conn, _ := amqp.Dial("amqp://guest:guest@35.76.111.125:5672/") ch, _ := conn.Channel() body := "This is a delayed message, created at " + time.Now().Format("2006-01-02 15:04:05") fmt.Println(body) // 发送消息到queue.normal队列中 ch.Publish("", "queue.normal", false, false, amqp.Publishing{ Body: []byte(body), Expiration: "10000", // 设置TTL为10秒 }) defer conn.Close() defer ch.Close() }
func main() { conn, _ := amqp.Dial("amqp://guest:guest@35.76.111.125:5672/") ch, _ := conn.Channel() //监听queue.dlx队列 msgs, _ := ch.Consume( "queue.dlx", "", true, false, false, false, nil, ) for d := range msgs { fmt.Printf("receive: %s\n", d.Body) // 收到消息,业务处理 } }