1. 概述

RabbitMQhttps://github.com/streadway/amqpAMQP
  • 保证断线重连
  • 生产者保证消息至少一次发送到队列中
  • 消费者将Ack交给执行业务函数
  • 消费者控制消费携程数量

2. Conn

ConnConnectionChannel
package rabbitx

import (
	"fmt"
	"strconv"
	"time"

	"github.com/streadway/amqp"
)

// ReconntTimes 重连次数
const ReconnectTimes = 100

// ReconntTimes 重连间隔,每次等比递增
const ReconnectInterval = 3

type Conn struct {
	conn *amqp.Connection
	ch   *amqp.Channel

	connStr     string
	isConnected bool
	recevier    chan *amqp.Error
}

// NewConn 新建连接
func NewConn(connStr string) (*Conn, error) {
	c := &Conn{connStr: connStr}

	// 启动时,无法连接则报错
	if err := c.connect(); err != nil {
		return nil, err
	}

	c.recevier = make(chan *amqp.Error)
	c.conn.NotifyClose(c.recevier)
	go c.listenConnClose()
	return c, nil
}

// connect 连接
func (c *Conn) connect() error {
	conn, err := amqp.Dial(c.connStr)
	if err != nil {
		return err
	}
	c.conn = conn
	ch, err := conn.Channel()
	if err != nil {
		return err
	}
	c.ch = ch
	c.isConnected = true
	return nil
}

// listenConnClose 监听连接重连
func (c *Conn) listenConnClose() {
	for e := range c.recevier {
		fmt.Println("rabbitmq disconnect, reason: " + e.Reason)
		// 判断连接是否关闭
		if !c.conn.IsClosed() {
			c.conn.Close()
		}

		// 重新连接
		cTag := ReconnectTimes
		for i := 0; i < cTag; i++ {
			fmt.Println("rabbitmq 第" + strconv.Itoa(i) + "次重连")
			if err := c.connect(); err == nil {
				cTag = -1
				break
			}

			time.Sleep(time.Duration(i+1) * ReconnectInterval * time.Second)
		}

	}
}

3. Producer

ProducerExchangeExchange
package rabbitx

import (
	"errors"
	"fmt"
	"time"

	"github.com/streadway/amqp"
)

type Producer struct {
	c          *Conn
	exchange   string
	routingKey string

	basicReturn chan amqp.Return
}

// NewProducer 创建一个生产者
func NewProducer(conn *Conn, exchange string, routingKey string, exType string) *Producer {
	p := &Producer{
		exchange:   exchange,
		routingKey: routingKey,
	}

	p.c = conn

	// 交换机定义,如无则创建
	if err := p.c.ch.ExchangeDeclare(p.exchange, exType, true, false, false, false, nil); err != nil {
		panic(err)
	}

	// 如果消息没有发送到队列
	p.basicReturn = make(chan amqp.Return)
	p.c.ch.NotifyReturn(p.basicReturn)
	go p.listenPubReturn()
	return p
}

// listenPubReturn 监听消息被退回
func (p *Producer) listenPubReturn() {
	for r := range p.basicReturn {
		fmt.Println("return msg: " + string(r.Body))

		p.Publish(r.Body)
		time.Sleep(3 * time.Second)
	}
}

// Pushlish 发出消息
func (p *Producer) Publish(msg []byte) error {
	if !p.c.isConnected {
		return errors.New("conn is disconnected")
	}
	return p.c.ch.Publish(p.exchange, p.routingKey, true, false, amqp.Publishing{
		Body: msg,
	})
}

mandatory属性选择开启,才会将发送不成功的消息退回

4. Consumer

Consumerants
package rabbitx

import (
	"fmt"

	"github.com/panjf2000/ants/v2"
	"github.com/streadway/amqp"
)

const AntPoolSize = 100 // 消费携程池最大数量

type Consumer struct {
	c *Conn

	queueName string
}

// NewConsumer 新建收费者
func NewConsumer(conn *Conn, exchange string, routingKey string, exType string, queueName string) *Consumer {
	cu := &Consumer{c: conn, queueName: queueName}

	// 交换机定义,如无则创建
	if err := cu.c.ch.ExchangeDeclare(exchange, exType, true, false, false, false, nil); err != nil {
		panic(err)
	}
	// 队列定义,如无则创建
	q, err := cu.c.ch.QueueDeclare(queueName, true, false, false, false, nil)
	if err != nil {
		panic(err)
	}

	// 绑定队列到交换机
	if err := cu.c.ch.QueueBind(q.Name, routingKey, exchange, false, nil); err != nil {
		panic(err)
	}

	return cu
}

// Consume 执行消费
func (cu *Consumer) Consume(dofunc func(d amqp.Delivery)) error {
	dChan, err := cu.c.ch.Consume(cu.queueName, "", false, false, false, false, nil)
	if err != nil {

		return err
	}

	// 携程池控制消费携程数目
	pool, err := ants.NewPoolWithFunc(AntPoolSize, func(i interface{}) {
		d := i.(amqp.Delivery)
		dofunc(d)
	})
	if err != nil {
		fmt.Printf("ants pool error:%s", err.Error())
		return err
	}
	defer pool.Release()

	for d := range dChan {
		if err := pool.Invoke(d); err != nil {
			fmt.Printf("ants pool error:%s", err.Error())
		}
	}

	return nil
}

5. 总结

Exchagne-RoutingKey-Queue
  • 生产者,发送消息失败;而自身崩溃了导致消息丢失
  • 消费者,消费消息自身崩溃导致消息重复消费,需要执行函数来过滤