在工作中发现,有些时候消息因为某些原因在消费一次后,如果消息失败,这时候不ack,消息就回一直重回队列首部,造成消息拥堵。
一:消息重试机制
如是有了如下思路:
消息进入队列前,header默认有参数 retry_num=0 表示尝试次数;
消费者在消费时候的,如果消息失败,就把消息插入另外一个队列(队列abc);该队列abc 绑定一个死信队列(原始消费的队列),这样形成一个回路;
当消息失败后,消息就进入队列abc,队列abc拥有ttl过期时间,ttl过期时间到了后,该消息进入死信队列(死信队列刚好是刚开始我们消费的队列);
这样消息就又回到原始消费队列尾部了;
最后可以通过队列消息头部的header参数retry_num 可以控制消息消费多少次后,直接插入db日志;
db日志可以记录交换机 路由,queuename,这样,可以做一个后台管理,可以手动一次把消息重新放入队列,进行消息(因为有时间消费队列里面可能在请求其它服务,其它服务也可能会挂掉)
这时候消息无论你消费多少次都没有用,但是入库db后,可以一键重回队列消息(当我们知道服务已经正常后)
图解:
附上代码
git clone https://github.com/sunlongv520/golang-rabbitmq
send.go 消费者
View Code
recv.go消费者
View Code
utils/rabbitmq包
View Code
二,延时队列
场景一:物联网系统经常会遇到向终端下发命令,如果命令一段时间没有应答,就需要设置成超时。
场景二:订单下单之后30分钟后,如果用户没有付钱,则系统自动取消订单。
最近的一个项目遇到了这种情况,如果运单30分钟还没有被接单,则状态自动变为已取消。实现延迟消息原理如下,借用一张图:
实现方案
- 定时任务轮询数据库,看是否有产生新任务,如果产生则消费任务
- pcntl_alarm为进程设置一个闹钟信号
- swoole的异步高精度定时器:swoole_time_tick(类似javascript的setInterval)和swoole_time_after(相当于javascript的setTimeout)
- rabbitmq延迟任务
以上四种方案,如果生产环境有使用到swoole建议使用第三种方案。此篇文章重点讲述第四种方案实现
生产者:
View Code
消费者:
View Code
延时队列实现和上面所讲的消息重试有异曲同工之处,都是利用了延时时间和死信队列这一特性实现
其它:该rabbitmq包实现中包含了,rabbitmq断线重连,有兴趣的同学可以看看
(重试和重连接是两个概念)
重连接 :rabbitmq链接失败导致任务失败,此时要等待rabbitmq服务器恢复正常后才能再次启动协程处理任务
重试:rabbitmq服务正常,消息消费进程也正常,但是消息处理失败。尝试多次消费消息后还是失败就ack消息,在整个重试过程中不会阻塞消费
golang监听rabbitmq消息队列任务断线自动重连接: