1. 概述
RabbitMQhttps://github.com/streadway/amqpAMQP
- 保证断线重连
- 生产者保证消息至少一次发送到队列中
- 消费者将Ack交给执行业务函数
- 消费者控制消费携程数量
2. Conn
ConnConnectionChannel
package rabbitx
import (
"fmt"
"strconv"
"time"
"github.com/streadway/amqp"
)
// ReconntTimes 重连次数
const ReconnectTimes = 100
// ReconntTimes 重连间隔,每次等比递增
const ReconnectInterval = 3
type Conn struct {
conn *amqp.Connection
ch *amqp.Channel
connStr string
isConnected bool
recevier chan *amqp.Error
}
// NewConn 新建连接
func NewConn(connStr string) (*Conn, error) {
c := &Conn{connStr: connStr}
// 启动时,无法连接则报错
if err := c.connect(); err != nil {
return nil, err
}
c.recevier = make(chan *amqp.Error)
c.conn.NotifyClose(c.recevier)
go c.listenConnClose()
return c, nil
}
// connect 连接
func (c *Conn) connect() error {
conn, err := amqp.Dial(c.connStr)
if err != nil {
return err
}
c.conn = conn
ch, err := conn.Channel()
if err != nil {
return err
}
c.ch = ch
c.isConnected = true
return nil
}
// listenConnClose 监听连接重连
func (c *Conn) listenConnClose() {
for e := range c.recevier {
fmt.Println("rabbitmq disconnect, reason: " + e.Reason)
// 判断连接是否关闭
if !c.conn.IsClosed() {
c.conn.Close()
}
// 重新连接
cTag := ReconnectTimes
for i := 0; i < cTag; i++ {
fmt.Println("rabbitmq 第" + strconv.Itoa(i) + "次重连")
if err := c.connect(); err == nil {
cTag = -1
break
}
time.Sleep(time.Duration(i+1) * ReconnectInterval * time.Second)
}
}
}
3. Producer
ProducerExchangeExchange
package rabbitx
import (
"errors"
"fmt"
"time"
"github.com/streadway/amqp"
)
type Producer struct {
c *Conn
exchange string
routingKey string
basicReturn chan amqp.Return
}
// NewProducer 创建一个生产者
func NewProducer(conn *Conn, exchange string, routingKey string, exType string) *Producer {
p := &Producer{
exchange: exchange,
routingKey: routingKey,
}
p.c = conn
// 交换机定义,如无则创建
if err := p.c.ch.ExchangeDeclare(p.exchange, exType, true, false, false, false, nil); err != nil {
panic(err)
}
// 如果消息没有发送到队列
p.basicReturn = make(chan amqp.Return)
p.c.ch.NotifyReturn(p.basicReturn)
go p.listenPubReturn()
return p
}
// listenPubReturn 监听消息被退回
func (p *Producer) listenPubReturn() {
for r := range p.basicReturn {
fmt.Println("return msg: " + string(r.Body))
p.Publish(r.Body)
time.Sleep(3 * time.Second)
}
}
// Pushlish 发出消息
func (p *Producer) Publish(msg []byte) error {
if !p.c.isConnected {
return errors.New("conn is disconnected")
}
return p.c.ch.Publish(p.exchange, p.routingKey, true, false, amqp.Publishing{
Body: msg,
})
}
mandatory属性选择开启,才会将发送不成功的消息退回
4. Consumer
Consumerants
package rabbitx
import (
"fmt"
"github.com/panjf2000/ants/v2"
"github.com/streadway/amqp"
)
const AntPoolSize = 100 // 消费携程池最大数量
type Consumer struct {
c *Conn
queueName string
}
// NewConsumer 新建收费者
func NewConsumer(conn *Conn, exchange string, routingKey string, exType string, queueName string) *Consumer {
cu := &Consumer{c: conn, queueName: queueName}
// 交换机定义,如无则创建
if err := cu.c.ch.ExchangeDeclare(exchange, exType, true, false, false, false, nil); err != nil {
panic(err)
}
// 队列定义,如无则创建
q, err := cu.c.ch.QueueDeclare(queueName, true, false, false, false, nil)
if err != nil {
panic(err)
}
// 绑定队列到交换机
if err := cu.c.ch.QueueBind(q.Name, routingKey, exchange, false, nil); err != nil {
panic(err)
}
return cu
}
// Consume 执行消费
func (cu *Consumer) Consume(dofunc func(d amqp.Delivery)) error {
dChan, err := cu.c.ch.Consume(cu.queueName, "", false, false, false, false, nil)
if err != nil {
return err
}
// 携程池控制消费携程数目
pool, err := ants.NewPoolWithFunc(AntPoolSize, func(i interface{}) {
d := i.(amqp.Delivery)
dofunc(d)
})
if err != nil {
fmt.Printf("ants pool error:%s", err.Error())
return err
}
defer pool.Release()
for d := range dChan {
if err := pool.Invoke(d); err != nil {
fmt.Printf("ants pool error:%s", err.Error())
}
}
return nil
}
5. 总结
Exchagne-RoutingKey-Queue
- 生产者,发送消息失败;而自身崩溃了导致消息丢失
- 消费者,消费消息自身崩溃导致消息重复消费,需要执行函数来过滤