如是有了如下思路:
消息进入队列前, header 默认有参数 retry_num=0 表示尝试次数;
消费者在消费时候的, 如果消息失败,就把消息插入另外一个队列(队列abc);该队列abc 绑定一个死信队列(原始消费的队列),这样形成一个回路 ;
当消息失败后,消息就进入队列abc,队列abc拥有ttl过期时间,ttl过期时间到了后,该消息进入死信队列(死信队列刚好是刚开始我们消费的队列);
这样消息就又回到原始消费队列尾部了;
最后可以通过队列消息头部的header参数retry_num 可以控制消息消费多少次后,直接插入db日志;
db日志可以记录 交换机 路由,queuename,这样,可以做一个后台管理,可以手动一次把消息重新放入队列,进行消息(因为有时间消费队列里面可能在请求其它服务,其它服务也可能会挂掉)
这时候消息无论你消费多少次都没有用,但是入库db后,可以一键重回队列消息(当我们知道服务已经正常后)
图解:
附上代码
git clone rabbitMQ
send.go 消费者
package main
import (
"fmt"
_ "fmt"
"#34;
)
func main() {
for i := 0;i<20;i++{
body := fmt.Sprintf("{\"order_id\":%d}",i)
fmt.Println(body)
/**
使用默认的交换机
如果是默认交换机
type QueueExchange struct {
QuName string // 队列名称
RtKey string // key值
ExName string // 交换机名称
ExType string // 交换机类型
dns string //链接地址
}
如果你喜欢使用默认交换机
Rt Key 此处建议填写成 RtKey 和 QuName 一样的值
*/
queueExchange := rabbitmq.QueueExchange{
"a_test_0001",
"a_test_0001",
"hello_go",
" direct ",
"amqp://guest:guest@192.168.1.169:5672/",
}
_ = rabbitmq .Send(queueExchange,body)
}
}
recv .go消费者
package main
import (
"fmt"
"#34;
"time"
)
type RecvPro struct {
}
//// 实现消费者 消费消息失败 自动进入延时尝试 尝试3次之后入库db
/*
返回值 error 为nil 则表示该消息消费成功
否则消息会进入ttl延时队列 重复尝试消费3次
3次后消息如果还是失败 消息就执行失败 进入告警 FailAction
*/func (t *RecvPro) Consumer(data byte []byte) error {
time.Sleep(time.Second*1)
//return errors.New("顶顶顶顶")
fmt.Println(string(dataByte))
//time.Sleep(1*time.Second)
//return errors.New("顶顶顶顶")
return nil
}
//消息已经消费3次 失败了 请进行处理
/*
如果消息 消费3次后 仍然失败 此处可以根据情况 对消息进行告警提醒 或者 补偿 入库db 钉钉告警等等
*/func (t *RecvPro) FailAction(err error,dataByte []byte) error {
fmt.Println(string(dataByte))
fmt.Println(err)
fmt.Println("任务处理失败了,我要进入db日志库了")
fmt.Println("任务处理失败了,发送 钉钉 消息通知主人")
return nil
}
func main() {
processTask := &RecvPro{}
/*
runNums: 表示任务并发处理数量 一般建议 普通任务1-3 就可以了
maxTryConnTimeFromMinute:表示最大尝试时间 分钟
*/ err := rabbitmq.Recv(rabbitmq.QueueExchange{
"a_test_0001",
"a_test_0001",
"hello_go",
"direct",
"amqp://guest:guest@192.168.1.169:5672/",
},
processTask,4,2)
if(err != nil){
fmt.Println(err)
}
}
recv.go消费者
package main
import (
"fmt"
"#34;
"time"
)
type RecvPro struct {
}
//// 实现消费者 消费消息失败 自动进入延时尝试 尝试3次之后入库db
/*
返回值 error 为nil 则表示该消息消费成功
否则消息会进入ttl延时队列 重复尝试消费3次
3次后消息如果还是失败 消息就执行失败 进入告警 FailAction
*/func (t *RecvPro) Consumer(dataByte []byte) error {
time.Sleep(time.Second*1)
//return errors.New("顶顶顶顶")
fmt.Println(string(dataByte))
//time.Sleep(1*time.Second)
//return errors.New("顶顶顶顶")
return nil
}
//消息已经消费3次 失败了 请进行处理
/*
如果消息 消费3次后 仍然失败 此处可以根据情况 对消息进行告警提醒 或者 补偿 入库db 钉钉告警等等
*/func (t *RecvPro) FailAction(err error,dataByte []byte) error {
fmt.Println(string(dataByte))
fmt.Println(err)
fmt.Println("任务处理失败了,我要进入db日志库了")
fmt.Println("任务处理失败了,发送钉钉消息通知主人")
return nil
}
func main() {
processTask := &RecvPro{}
/*
runNums: 表示任务并发处理数量 一般建议 普通任务1-3 就可以了
maxTryConnTimeFromMinute:表示最大尝试时间 分钟
*/ err := rabbitmq.Recv(rabbitmq.QueueExchange{
"a_test_0001",
"a_test_0001",
"hello_go",
"direct",
"amqp://guest:guest@192.168.1.169:5672/",
},
processTask,4,2)
if(err != nil){
fmt.Println(err)
}
}
utils/rabbitmq包
package rabbitmq
import (
"errors"
" strconv "
"time"
//"errors"
"fmt"
"github.com/streadway/amqp"
"log"
)
// 定义 全局变量 ,指针类型
var mq Conn *amqp.Connection
var mqChan *amqp.Channel
// 定义生产者接口
type Producer interface {
MsgContent() string
}
// 定义生产者接口
type RetryProducer interface {
MsgContent() string
}
// 定义接收者接口
type Receiver interface {
Consumer([]byte) error
FailAction(error , []byte) error
}
// 定义RabbitMQ对象
type RabbitMQ struct {
connection *amqp.Connection
Channel *amqp.Channel
dns string
QueueName string // 队列名称
RoutingKey string // key名称
ExchangeName string // 交换机名称
ExchangeType string // 交换机类型
producerList []Producer
retryProducerList []RetryProducer
receiverList []Receiver
}
// 定义队列交换机对象
type QueueExchange struct {
QuName string // 队列名称
RtKey string // key值
ExName string // 交换机名称
ExType string // 交换机类型
Dns string //链接地址
}
// 链接rabbitMQ
func (r *RabbitMQ)MqConnect() (err error){
mqConn, err = amqp.Dial(r.dns)
r.connection = mqConn // 赋值给RabbitMQ对象
if err != nil {
fmt.Printf("rbmq链接失败 :%s \n", err )
}
return
}
// 关闭mq链接
func (r *RabbitMQ)CloseMqConnect() (err error){
err = r.connection.Close()
if err != nil{
fmt.Printf("关闭mq链接失败 :%s \n", err)
}
return
}
// 链接rabbitMQ
func (r *RabbitMQ)MqOpenChannel() (err error){
mqConn := r.connection
r.Channel, err = mqConn.Channel()
//defer mqChan.Close()
if err != nil {
fmt.Printf("MQ打开管道失败:%s \n", err)
}
return err
}
// 链接rabbitMQ
func (r *RabbitMQ)CloseMqChannel() (err error){
r.Channel.Close()
if err != nil {
fmt.Printf("关闭mq链接失败 :%s \n", err)
}
return err
}
// 创建一个新的操作对象
func NewMq(q QueueExchange) RabbitMQ {
return RabbitMQ{
QueueName:q.QuName,
RoutingKey:q.RtKey,
ExchangeName: q.ExName,
ExchangeType: q.ExType,
dns:q.Dns,
}
}
func (mq *RabbitMQ) sendMsg (body string) (err error) {
err = mq.MqOpenChannel()
ch := mq.Channel
if err != nil{
log.Printf("Channel err :%s \n", err)
}
defer func() {
_ = mq.Channel.Close()
}()
if mq.ExchangeName != "" {
if mq.ExchangeType == ""{
mq.ExchangeType = "direct"
}
err = ch.ExchangeDeclare(mq.ExchangeName, mq.ExchangeType, true, false, false, false, nil )
if err != nil {
log.Printf("ExchangeDeclare err :%s \n", err)
}
}
// 用于检查队列是否存在,已经存在不需要重复声明
_, err = ch.QueueDeclare(mq.QueueName, true, false, false, false, nil)
if err != nil {
log.Printf("QueueDeclare err :%s \n", err)
}
// 绑定任务
if mq.RoutingKey != "" && mq.ExchangeName != "" {
err = ch.QueueBind(mq.QueueName, mq.RoutingKey, mq.ExchangeName, false, nil)
if err != nil {
log.Printf("QueueBind err :%s \n", err)
}
}
if mq.ExchangeName != "" && mq.RoutingKey != ""{
err = mq.Channel.Publish(
mq.ExchangeName, // exchange
mq.RoutingKey, // routing key
false, // mandatory
false, // immediate
amqp.Publishing {
ContentType: "text/plain",
Body: []byte(body),
DeliveryMode: 2,
})
}else{
err = mq.Channel.Publish(
"", // exchange
mq.QueueName, // routing key
false, // mandatory
false, // immediate
amqp.Publishing {
ContentType: "text/plain",
Body: []byte(body),
DeliveryMode: 2,
})
}
return
}
/*
发送延时消息
*/func (mq *RabbitMQ)sendDelayMsg(body string,ttl int64) (err error){
err =mq.MqOpenChannel()
ch := mq.Channel
if err != nil{
log.Printf("Channel err :%s \n", err)
}
defer mq.Channel.Close()
if mq.ExchangeName != "" {
if mq.ExchangeType == ""{
mq.ExchangeType = "direct"
}
err = ch.ExchangeDeclare(mq.ExchangeName, mq.ExchangeType, true, false, false, false, nil)
if err != nil {
return
}
}
if ttl <= 0{
return errors.New("发送延时消息,ttl参数是必须的")
}
table := make(map[string]interface{},3)
table["x-dead-letter-routing-key"] = mq.RoutingKey
table["x-dead-letter-exchange"] = mq.ExchangeName
table["x-message-ttl"] = ttl*1000
//fmt.Printf("%+v",table)
//fmt.Printf("%+v",mq)
// 用于检查队列是否存在,已经存在不需要重复声明
ttlstring := strconv.FormatInt(ttl,10)
queueName := fmt.Sprintf("%s_delay_%s",mq.QueueName ,ttlstring)
routingKey := fmt.Sprintf("%s_delay_%s",mq.QueueName ,ttlstring)
_, err = ch.QueueDeclare(queueName, true, false, false, false, table)
if err != nil {
return
}
// 绑定任务
if routingKey != "" && mq.ExchangeName != "" {
err = ch.QueueBind(queueName, routingKey, mq.ExchangeName, false, nil)
if err != nil {
return
}
}
header := make(map[string]interface{},1)
header["retry_nums"] = 0
var ttl_exchange string
var ttl_routkey string
if(mq.ExchangeName != "" ){
ttl_exchange = mq.ExchangeName
}else{
ttl_exchange = ""
}
if mq.RoutingKey != "" && mq.ExchangeName != ""{
ttl_routkey = routingKey
}else{
ttl_routkey = queueName
}
err = mq.Channel.Publish(
ttl_exchange, // exchange
ttl_routkey, // routing key
false, // mandatory
false, // immediate
amqp.Publishing {
ContentType: "text/plain",
Body: []byte(body),
Headers:header,
})
if err != nil {
return
}
return
}
func (mq *RabbitMQ) sendRetryMsg (body string,retry_nums int32,args ...string) {
err :=mq.MqOpenChannel()
ch := mq.Channel
if err != nil{
log.Printf("Channel err :%s \n", err)
}
defer mq.Channel.Close()
if mq.ExchangeName != "" {
if mq.ExchangeType == ""{
mq.ExchangeType = "direct"
}
err = ch.ExchangeDeclare(mq.ExchangeName, mq.ExchangeType, true, false, false, false, nil)
if err != nil {
log.Printf("ExchangeDeclare err :%s \n", err)
}
}
//原始路由key
oldRoutingKey := args[0]
//原始交换机名
oldExchangeName := args[1]
table := make(map[string]interface{},3)
table["x-dead-letter-routing-key"] = oldRoutingKey
if oldExchangeName != "" {
table["x-dead-letter-exchange"] = oldExchangeName
}else{
mq.ExchangeName = ""
table["x-dead-letter-exchange"] = ""
}
table["x-message-ttl"] = int64(20000)
//fmt.Printf("%+v",table)
//fmt.Printf("%+v",mq)
// 用于检查队列是否存在,已经存在不需要重复声明
_, err = ch.QueueDeclare(mq.QueueName, true, false, false, false, table)
if err != nil {
log.Printf("QueueDeclare err :%s \n", err)
}
// 绑定任务
if mq.RoutingKey != "" && mq.ExchangeName != "" {
err = ch.QueueBind(mq.QueueName, mq.RoutingKey, mq.ExchangeName, false, nil)
if err != nil {
log.Printf("QueueBind err :%s \n", err)
}
}
header := make(map[string]interface{},1)
header["retry_nums"] = retry_nums + int32(1)
var ttl_exchange string
var ttl_routkey string
if(mq.ExchangeName != "" ){
ttl_exchange = mq.ExchangeName
}else{
ttl_exchange = ""
}
if mq.RoutingKey != "" && mq.ExchangeName != ""{
ttl_routkey = mq.RoutingKey
}else{
ttl_routkey = mq.QueueName
}
//fmt.Printf("ttl_exchange:%s,ttl_routkey:%s \n",ttl_exchange,ttl_routkey)
err = mq.Channel.Publish(
ttl_exchange, // exchange
ttl_routkey, // routing key
false, // mandatory
false, // immediate
amqp.Publishing {
ContentType: "text/plain",
Body: []byte(body),
Headers:header,
})
if err != nil {
fmt.Printf("MQ任务发送失败:%s \n", err)
}
}
// 监听接收者接收任务 消费者
func (mq *RabbitMQ) ListenReceiver(receiver Receiver) {
err :=mq.MqOpenChannel()
ch := mq.Channel
if err != nil{
log.Printf("Channel err :%s \n", err)
}
defer mq.Channel.Close()
if mq.ExchangeName != "" {
if mq.ExchangeType == ""{
mq.ExchangeType = "direct"
}
err = ch.ExchangeDeclare(mq.ExchangeName, mq.ExchangeType, true, false, false, false, nil)
if err != nil {
log.Printf("ExchangeDeclare err :%s \n", err)
}
}
// 用于检查队列是否存在,已经存在不需要重复声明
_, err = ch.QueueDeclare(mq.QueueName, true, false, false, false, nil)
if err != nil {
log.Printf("QueueDeclare err :%s \n", err)
}
// 绑定任务
if mq.RoutingKey != "" && mq.ExchangeName != "" {
err = ch.QueueBind(mq.QueueName, mq.RoutingKey, mq.ExchangeName, false, nil)
if err != nil {
log.Printf("QueueBind err :%s \n", err)
}
}
// 获取消费通道,确保rabbitMQ一个一个发送消息
err = ch.Qos(1, 0, false)
msgList, err := ch.Consume(mq.QueueName, "", false, false, false, false, nil)
if err != nil {
log.Printf("Consume err :%s \n", err)
}
for msg := range msgList {
retry_nums,ok := msg.Headers["retry_nums"].(int32)
if(!ok){
retry_nums = int32(0)
}
// 处理数据
err := receiver.Consumer(msg.Body)
if err!=nil {
//消息处理失败 进入延时尝试机制
if retry_nums < 3{
fmt.Println(string(msg.Body))
fmt.Printf("消息处理失败 消息开始进入尝试 ttl延时队列 \n")
retry_msg(msg.Body,retry_nums,QueueExchange{
mq.QueueName,
mq.RoutingKey,
mq.ExchangeName,
mq.ExchangeType,
mq.dns,
})
}else{
//消息失败 入库db
fmt.Printf("消息处理3次后还是失败了 入库db 钉钉告警 \n")
receiver.FailAction(err,msg.Body)
}
err = msg.Ack(true)
if err != nil {
fmt.Printf("确认消息未完成异常:%s \n", err)
}
}else {
// 确认消息,必须为false
err = msg.Ack(true)
if err != nil {
fmt.Printf("消息消费ack失败 err :%s \n", err)
}
}
}
}
//消息处理失败之后 延时尝试
func retry_msg(msg []byte,retry_nums int32,queueExchange QueueExchange){
//原始队列名称 交换机名称
oldQName := queueExchange.QuName
oldExchangeName := queueExchange.ExName
oldRoutingKey := queueExchange.RtKey
if oldRoutingKey == "" || oldExchangeName == ""{
oldRoutingKey = oldQName
}
if queueExchange.QuName != "" {
queueExchange.QuName = queueExchange.QuName + "_retry_3";
}
if queueExchange.RtKey != "" {
queueExchange.RtKey = queueExchange.RtKey + "_retry_3";
}else{
queueExchange.RtKey = queueExchange.QuName + "_retry_3";
}
//fmt.Printf("%+v",queueExchange)
mq := NewMq(queueExchange)
_ = mq.MqConnect()
defer func(){
_ = mq.CloseMqConnect()
}()
//fmt.Printf("%+v",queueExchange)
mq.sendRetryMsg(string(msg),retry_nums,oldRoutingKey,oldExchangeName)
}
func Send(queueExchange QueueExchange,msg string) (err error){
mq := NewMq(queueExchange)
err = mq.MqConnect()
if err != nil{
return
}
defer func(){
_ = mq.CloseMqConnect()
}()
err = mq.sendMsg(msg)
return
}
//发送延时消息
func SendDelay(queueExchange QueueExchange,msg string,ttl int64)(err error){
mq := NewMq(queueExchange)
err = mq.MqConnect()
if err != nil{
return
}
defer func(){
_ = mq.CloseMqConnect()
}()
err = mq.sendDelayMsg(msg,ttl)
return
}
/*
runNums 开启并发执行任务数量
*/func Recv(queueExchange QueueExchange,receiver Receiver,otherParams ...int) (err error){
var (
exitTask bool
maxTryConnNums int //rbmq链接失败后多久尝试一次
runNums int
maxTryConnTimeFromMinute int
)
if(len(otherParams) <= 0){
runNums = 1
maxTryConnTimeFromMinute = 0
}else if(len(otherParams) == 1){
runNums = otherParams[0]
maxTryConnTimeFromMinute = 0
}else if(len(otherParams) == 2){
runNums = otherParams[0]
maxTryConnTimeFromMinute = otherParams[1]
}
//maxTryConnNums := 360 //rbmq链接失败后最大尝试次数
//maxTryConnTime := time.Duration(10) //rbmq链接失败后多久尝试一次
maxTryConnNums = maxTryConnTimeFromMinute * 10 * maxTryConnTimeFromMinute//rbmq链接失败后最大尝试次数
maxTryConnTime := time.Duration(6) //rbmq链接失败后多久尝试一次
mq := NewMq(queueExchange)
//链接rabbitMQ
err = mq.MqConnect()
if(err != nil){
return
}
defer func() {
if panicErr := recover(); panicErr != nil{
fmt.Println(recover())
err = errors.New(fmt.Sprintf("%s",panicErr))
}
}()
//rbmq断开链接后 协程退出释放信号
taskQuit:= make(chan struct{}, 1)
//尝试链接rbmq
tryToLinkC := make(chan struct{}, 1)
//最大尝试次数
tryToLinkMaxNums := make(chan struct{}, 1)
maxTryNums := 0 //尝试重启次数
//开始执行任务
for i:=1;i<=runNums;i++{
go Recv2(mq,receiver,taskQuit);
}
//如果rbmq断开连接后 尝试重新建立链接
var tryToLink = func() {
for {
maxTryNums += 1
err = mq.MqConnect()
if(err == nil){
tryToLinkC <- struct{}{}
break
}
if(maxTryNums > maxTryConnNums){
tryToLinkMaxNums <- struct{}{}
break
}
//如果链接断开了 10秒重新尝试链接一次
time.Sleep(time.Second * maxTryConnTime)
}
return
}
scheduleTimer := time.NewTimer(time.Millisecond*300)
exitTask = true
for{
select {
case <-tryToLinkC: //建立链接成功后 重新开启协程执行任务
fmt.Println("重新开启新的协程执行任务")
go Recv2(mq,receiver,taskQuit);
case <-tryToLinkMaxNums://rbmq超出最大链接次数 退出任务
fmt.Println("rbmq链接超过最大尝试次数!")
exitTask = false
err = errors.New("rbmq链接超过最大尝试次数!")
case <- taskQuit ://rbmq断开连接后 开始尝试重新建立链接
fmt.Println("rbmq断开连接后 开始尝试重新建立链接")
go tryToLink()
case <- scheduleTimer.C:
//fmt.Println("~~~~~~~~~~~~~~~~~~~~~~~")
}
// 重置调度间隔
scheduleTimer.Reset(time.Millisecond*300)
if !exitTask{
break
}
}
fmt.Println("exit")
return
}
func Recv2(mq RabbitMQ,receiver Receiver,taskQuit chan<- struct{}){
defer func() {
fmt.Println("rbmq链接失败,协程任务退出~~~~~~~~~~~~~~~~~~~~")
taskQuit <- struct{}{}
return
}()
// 验证链接是否正常
err := mq.MqOpenChannel()
if(err != nil){
return
}
mq.ListenReceiver(receiver)
}
type retryPro struct {
msgContent string
}
二,延时队列 场景一:物联网系统经常会遇到向终端下发命令,如果命令一段时间没有应答,就需要设置成超时。
场景二:订单下单之后30分钟后,如果用户没有付钱,则系统自动取消订单。
最近的一个项目遇到了这种情况,如果运单30分钟还没有被接单,则状态自动变为已取消。实现延迟消息原理如下,借用一张图:
实现方案
- 定时任务轮询数据库,看是否有产生新任务,如果产生则消费任务
- pcntl_alarm为进程设置一个闹钟信号
- swoole的异步高精度定时器:swoole_time_tick(类似javascript的setInterval)和swoole_time_after(相当于javascript的setTimeout)
- rabbitmq延迟任务
以上四种方案,如果生产环境有使用到swoole建议使用第三种方案。此篇文章重点讲述第四种方案实现
生产者:
1 <?php
2 require_once __DIR__ . '/../vendor/autoload.php';
3 use PhpAmqpLib\Connection\AMQPStreamConnection;
4 use PhpAmqpLib\Message\AMQPMessage;
5
6
7 $queue = "test_ack_queue";
8 $exchange = "test_ack_queue";
9 //获取连接
10 $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
11 //从连接中创建通道
12 $channel = $connection->channel();
13
14 $channel->exchange_declare('delay_exchange', 'direct',false,true,false);
15 $channel->exchange_declare('cache_exchange', 'direct',false,true,false);
16
17 $tale = new \PhpAmqpLib\Wire\AMQPTable();
18 $tale->set('x-dead-letter-exchange', 'delay_exchange');
19 $tale->set('x-dead-letter-routing-key','delay_exchange');
20 //$tale->set('x-message-ttl',10000);
21
22 $channel->queue_declare('cache_queue',false,true,false,false,false,$tale);
23 $channel->queue_bind('cache_queue', 'cache_exchange','cache_exchange');
24
25 $channel->queue_declare('delay_queue',false,true,false,false,false);
26 $channel->queue_bind('delay_queue', 'delay_exchange','delay_exchange');
27
28
29 $msg = new AMQPMessage('Hello World',array(
30 'expiration' => 10000,
31 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
32
33 ));
34
35 $channel->basic_publish($msg,'cache_exchange','cache_exchange');
36 echo date('Y-m-d H:i:s')." [x] Sent 'Hello World!' ".PHP_EOL;
37
38
39
40
41 //while ($wait) {
42 // $channel->wait();
43 //}
44
45 $channel->close();
46 $connection->close();
task
消费者:
1 <?php
2 require_once __DIR__ . '/../vendor/autoload.php';
3 use PhpAmqpLib\Connection\AMQPStreamConnection;
4 use PhpAmqpLib\Message\AMQPMessage;
5
6
7 //获取连接
8 $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
9 //从连接中创建通道
10 $channel = $connection->channel();
11
12
13 //$channel->queue_declare($queue, false, true, false, false);
14 //$channel->exchange_declare($exchange, 'topic', false, true, false);
15 //$channel->queue_bind($queue, $exchange);
16
17
18
19 $channel->exchange_declare('delay_exchange', 'direct',false,false,false);
20 $channel->queue_declare('delay_queue',false,true,false,false,false);
21 $channel->queue_bind('delay_queue', 'delay_exchange','delay_exchange');
22
23
24
25 function process_message(AMQPMessage $message)
26 {
27 $headers = $message->get('application_headers');
28 $nativeData = $headers->getNativeData();
29 // var_dump($nativeData['x-delay']);
30 echo date('Y-m-d H:i:s')." [x] Received",$message->body,PHP_EOL;
31 $message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);
32
33 }
34
35
36 $channel->basic_qos(null, 1, null);
37 $channel->basic_consume('delay_queue', '', false, false, false, false, 'process_message');
38
39 function shutdown($channel, $connection)
40 {
41 $channel->close();
42 $connection->close();
43 }
44 register_shutdown_function('shutdown', $channel, $connection);
45
46 while (count($channel->callbacks)) {
47 $channel->wait();
48 }
work
延时队列实现和上面所讲的消息重试有异曲同工之处,都是利用了延时时间和死信队列这一特性实现
最新源码仓库地址:
其它:该rabbitmq包实现中包含了,rabbitmq断线重连,有兴趣的同学可以看看(重试和重连接是两个概念)
重连接 :rabbitmq链接失败导致任务失败,此时要等待rabbitmq服务器恢复正常后才能再次启动协程处理任务
重试:rabbitmq服务正常,消息消费进程也正常,但是消息处理失败。尝试多次消费消息后还是失败就ack消息,在整个重试过程中不会阻塞消费
golang监听rabbitmq消息队列任务断线自动重连接: