文章目录

rabbitmq的安装和启动

这里使用docker启动
确保已经安装好docker环境
执行下面命令即可
管理界面在http://localhost:15672(用户名:guest 密码:guest)

# 参考 https://hub.docker.com/_/rabbitmq
docker run -d -p 5672:5672 -p 15672:15672 rabbitmq:3-management

安装库

github地址:https://github.com/streadway/amqp

go get github.com/streadway/amqp

连接客户端

通过amqp.Dial(url)连接客户端,返回连接(dial)和错误信息
url 由 协议 + 用户名 + 密码 + 地址:端口 构成

# amqp是协议
# 第一个guest是用户名,第二个guest是密码
# localhost:5672 是地址+端口
dial, err := amqp.Dial("amqp://guest:guest@localhost:5672")

channel

写数据,读数据等操作,创建queue,exchange,都需要在channel的基础上进行
所以要先生成channel
通过dial.Channel(),得到channel和错误信息

channel, err := dial.Channel()

创建队列

通过 QueueDeclare 创建队列
func (ch *Channel) QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args Table) (Queue, error)
name:队列名,不填则会自动生成
durable: 为true代表mq挂了自动恢复时,这个队列还在
autoDelete: 为true代表断开连接就删除队列
exclusive: 为true代表只有当前连接可以使用该队列
noWait: 为true则不等待响应值返回
args: 其它附加参数

declare, err := channel.QueueDeclare("", false, false, false, false, nil)

删除队列

通过 QueueDelete 删除队列
func (ch *Channel) QueueDelete(name string, ifUnused, ifEmpty, noWait bool) (int, error)
name:队列名称
ifUnused:为true代表在不使用的情况下被删除
ifEmpty:为true代表在为空的情况下被删除
noWait:为true队列将在不等待服务器响应的情况下被删除

	queueDelete, err := channel.QueueDelete(declare.Name, false, false, false)

向队列中写入数据

使用 Publish 写入数据

	/**
	 exchange:exchange名称
	 key:在不使用exchange的情况下,为队列名称
	 mandatory:针对exchange,默认填false即可
	 immediate:是否要求写入的数据,必须被立即收走
	 Publishing:是 amqp.Publishing struct, 其中Body用于存放数据(byte)
	 */
    func (ch *Channel) Publish(exchange, key string, mandatory, immediate bool, msg Publishing) error 

	err = channel.Publish("", declare.Name, false, false, amqp.Publishing{
		Body: []byte ("这里是写入队列中的信息"),
	})

从队列中读取数据

使用Consume 从队列中读取数据

	go func() {
		/**
		func (ch *Channel) Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args Table)
		queue:队列名称
		consumer:名称
		autoAck:是否自动ack
		exclusive:只能当前consumer使用
		noLocal:填false即可,文档中说明了不再被支持
        noWait:不等待返回
		args:其他参数
		函数返回一个channel,可以从channel中不断获取数据
		*/
		consumerName := "cm1"
		consume, err := channel.Consume(declare.Name, consumerName, true, false, false, false, nil)

		if err != nil {
			panic(err)
		}
		for msg := range consume {
			fmt.Println(consumerName, "get message", msg.Body)
		}

	}()

例子1

实现一对一队列的进队,出队

package main

import (
	"fmt"
	"github.com/streadway/amqp"
	"time"
)

func main() {
	dial, err := amqp.Dial("amqp://guest:guest@localhost:5672")
	if err != nil {
		panic(err)
	}
	channel, err := dial.Channel()
	declare, err := channel.QueueDeclare("", false, false, false, false, nil)

	defer channel.Close()
	defer channel.QueueDelete(declare.Name,false,false,false)

	go func() {
		consumerName := "cm1"
		consume, err := channel.Consume(declare.Name, consumerName, true, false, false, false, nil)

		if err != nil {
			panic(err)
		}
		for msg := range consume {
			fmt.Println(consumerName, "get message", msg.Body)
		}

	}()

	if err != nil {
		panic(err)
	}

	i := 1

	for {
		time.Sleep(2 * time.Second)

		fmt.Println("写入",i)

		err = channel.Publish("", declare.Name, false, false, amqp.Publishing{
			Body: []byte (fmt.Sprintf("这里是写入队列中的信息 %d", i)),
		})

		i++

		if err != nil {
			panic(err)
		}

		if i > 3 {
			break
		}
	}
}

创建exchange

exchange有什么用呢,他可以将数据按照routeKey分发到对应的队列中去

	/**
	  func (ch *Channel) ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args Table) error
	  name:名称
	  kind:分发方式
	  durable:为true代表mq挂了自动恢复时,这个exchange还在
	  autoDelete:为true代表断开连接自动删除
	  internal:为true代表只有内部的exchange之间可以进行消息走动
	  noWait:是否等待返回
	  args:其他参数
	*/
	err = channel.ExchangeDeclare("ex_one", "fanout", false, false, false, false, nil)
	if err != nil {
		panic(err)
	}

绑定exchange和queue

exchange要把数据分发到queue的前提条件是
先创建queue,然后再将queue和exchange绑定

	declareOne, err := channel.QueueDeclare("", false, false, false, false, nil)
    	err = channel.QueueBind(declareOne.Name, "", exchangeName, false, nil)
	if err != nil {
		panic(err)
	}

向exchang中添加数据

这个和向队列中写入数据的唯一区别就是,填写exchang的名称,不填写queue名称
这样就不是直接将数据写入队列,而是靠exchange其定义的规则分发到符合条件的队列之中去

		channel.Publish(exchangeName, "", false, false, amqp.Publishing{
			Body: []byte(fmt.Sprintf("%d", i)),
		})

例子2

这个例子实现了靠exchange,将相同的消息推送到两个队列中去
保证两个队列都能收到一摸一样的消息

package main

import (
	"fmt"
	"github.com/streadway/amqp"
	"time"
)

func main() {
	dial, err := amqp.Dial("amqp://guest:guest@localhost:5672")
	if err != nil {
		panic(err)
	}
	channel, err := dial.Channel()
	
	exchangeName := "ex_one"
	err = channel.ExchangeDeclare(exchangeName, "fanout", false, false, false, false, nil)
	if err != nil {
		panic(err)
	}

	declareOne, err := channel.QueueDeclare("", false, false, false, false, nil)
	declareTwo, err := channel.QueueDeclare("", false, false, false, false, nil)
	if err != nil {
		panic(err)
	}

	err = channel.QueueBind(declareOne.Name, "", exchangeName, false, nil)
	if err != nil {
		panic(err)
	}
	err = channel.QueueBind(declareTwo.Name, "", exchangeName, false, nil)
	if err != nil {
		panic(err)
	}



	for i := 0; i < 10; i++ {
		channel.Publish(exchangeName, "", false, false, amqp.Publishing{
			Body: []byte(fmt.Sprintf("%d", i)),
		})
	}

	go consumer(channel, declareOne.Name, "consumer_one")
	go consumer(channel, declareTwo.Name, "consumer_two")

	time.Sleep(20 * time.Second)
}

func consumer(channel *amqp.Channel, queueName, consumer string) {
	consume, err := channel.Consume(queueName, consumer, true, false, false, false, nil)
	if err != nil {
		panic(err)
	}
	for msg := range consume {
		time.Sleep(time.Second * 1)
		fmt.Println("consumerName:", consumer, msg.Body)
	}
}