文章目录
rabbitmq的安装和启动
这里使用docker启动
确保已经安装好docker环境
执行下面命令即可
管理界面在http://localhost:15672(用户名:guest 密码:guest)
# 参考 https://hub.docker.com/_/rabbitmq
docker run -d -p 5672:5672 -p 15672:15672 rabbitmq:3-management
安装库
github地址:https://github.com/streadway/amqp
go get github.com/streadway/amqp
连接客户端
通过amqp.Dial(url)连接客户端,返回连接(dial)和错误信息
url 由 协议 + 用户名 + 密码 + 地址:端口 构成
# amqp是协议
# 第一个guest是用户名,第二个guest是密码
# localhost:5672 是地址+端口
dial, err := amqp.Dial("amqp://guest:guest@localhost:5672")
channel
写数据,读数据等操作,创建queue,exchange,都需要在channel的基础上进行
所以要先生成channel
通过dial.Channel(),得到channel和错误信息
channel, err := dial.Channel()
创建队列
通过 QueueDeclare 创建队列
func (ch *Channel) QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args Table) (Queue, error)
name:队列名,不填则会自动生成
durable: 为true代表mq挂了自动恢复时,这个队列还在
autoDelete: 为true代表断开连接就删除队列
exclusive: 为true代表只有当前连接可以使用该队列
noWait: 为true则不等待响应值返回
args: 其它附加参数
declare, err := channel.QueueDeclare("", false, false, false, false, nil)
删除队列
通过 QueueDelete 删除队列
func (ch *Channel) QueueDelete(name string, ifUnused, ifEmpty, noWait bool) (int, error)
name:队列名称
ifUnused:为true代表在不使用的情况下被删除
ifEmpty:为true代表在为空的情况下被删除
noWait:为true队列将在不等待服务器响应的情况下被删除
queueDelete, err := channel.QueueDelete(declare.Name, false, false, false)
向队列中写入数据
使用 Publish 写入数据
/**
exchange:exchange名称
key:在不使用exchange的情况下,为队列名称
mandatory:针对exchange,默认填false即可
immediate:是否要求写入的数据,必须被立即收走
Publishing:是 amqp.Publishing struct, 其中Body用于存放数据(byte)
*/
func (ch *Channel) Publish(exchange, key string, mandatory, immediate bool, msg Publishing) error
err = channel.Publish("", declare.Name, false, false, amqp.Publishing{
Body: []byte ("这里是写入队列中的信息"),
})
从队列中读取数据
使用Consume 从队列中读取数据
go func() {
/**
func (ch *Channel) Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args Table)
queue:队列名称
consumer:名称
autoAck:是否自动ack
exclusive:只能当前consumer使用
noLocal:填false即可,文档中说明了不再被支持
noWait:不等待返回
args:其他参数
函数返回一个channel,可以从channel中不断获取数据
*/
consumerName := "cm1"
consume, err := channel.Consume(declare.Name, consumerName, true, false, false, false, nil)
if err != nil {
panic(err)
}
for msg := range consume {
fmt.Println(consumerName, "get message", msg.Body)
}
}()
例子1
实现一对一队列的进队,出队
package main
import (
"fmt"
"github.com/streadway/amqp"
"time"
)
func main() {
dial, err := amqp.Dial("amqp://guest:guest@localhost:5672")
if err != nil {
panic(err)
}
channel, err := dial.Channel()
declare, err := channel.QueueDeclare("", false, false, false, false, nil)
defer channel.Close()
defer channel.QueueDelete(declare.Name,false,false,false)
go func() {
consumerName := "cm1"
consume, err := channel.Consume(declare.Name, consumerName, true, false, false, false, nil)
if err != nil {
panic(err)
}
for msg := range consume {
fmt.Println(consumerName, "get message", msg.Body)
}
}()
if err != nil {
panic(err)
}
i := 1
for {
time.Sleep(2 * time.Second)
fmt.Println("写入",i)
err = channel.Publish("", declare.Name, false, false, amqp.Publishing{
Body: []byte (fmt.Sprintf("这里是写入队列中的信息 %d", i)),
})
i++
if err != nil {
panic(err)
}
if i > 3 {
break
}
}
}
创建exchange
exchange有什么用呢,他可以将数据按照routeKey分发到对应的队列中去
/**
func (ch *Channel) ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args Table) error
name:名称
kind:分发方式
durable:为true代表mq挂了自动恢复时,这个exchange还在
autoDelete:为true代表断开连接自动删除
internal:为true代表只有内部的exchange之间可以进行消息走动
noWait:是否等待返回
args:其他参数
*/
err = channel.ExchangeDeclare("ex_one", "fanout", false, false, false, false, nil)
if err != nil {
panic(err)
}
绑定exchange和queue
exchange要把数据分发到queue的前提条件是
先创建queue,然后再将queue和exchange绑定
declareOne, err := channel.QueueDeclare("", false, false, false, false, nil)
err = channel.QueueBind(declareOne.Name, "", exchangeName, false, nil)
if err != nil {
panic(err)
}
向exchang中添加数据
这个和向队列中写入数据的唯一区别就是,填写exchang的名称,不填写queue名称
这样就不是直接将数据写入队列,而是靠exchange其定义的规则分发到符合条件的队列之中去
channel.Publish(exchangeName, "", false, false, amqp.Publishing{
Body: []byte(fmt.Sprintf("%d", i)),
})
例子2
这个例子实现了靠exchange,将相同的消息推送到两个队列中去
保证两个队列都能收到一摸一样的消息
package main
import (
"fmt"
"github.com/streadway/amqp"
"time"
)
func main() {
dial, err := amqp.Dial("amqp://guest:guest@localhost:5672")
if err != nil {
panic(err)
}
channel, err := dial.Channel()
exchangeName := "ex_one"
err = channel.ExchangeDeclare(exchangeName, "fanout", false, false, false, false, nil)
if err != nil {
panic(err)
}
declareOne, err := channel.QueueDeclare("", false, false, false, false, nil)
declareTwo, err := channel.QueueDeclare("", false, false, false, false, nil)
if err != nil {
panic(err)
}
err = channel.QueueBind(declareOne.Name, "", exchangeName, false, nil)
if err != nil {
panic(err)
}
err = channel.QueueBind(declareTwo.Name, "", exchangeName, false, nil)
if err != nil {
panic(err)
}
for i := 0; i < 10; i++ {
channel.Publish(exchangeName, "", false, false, amqp.Publishing{
Body: []byte(fmt.Sprintf("%d", i)),
})
}
go consumer(channel, declareOne.Name, "consumer_one")
go consumer(channel, declareTwo.Name, "consumer_two")
time.Sleep(20 * time.Second)
}
func consumer(channel *amqp.Channel, queueName, consumer string) {
consume, err := channel.Consume(queueName, consumer, true, false, false, false, nil)
if err != nil {
panic(err)
}
for msg := range consume {
time.Sleep(time.Second * 1)
fmt.Println("consumerName:", consumer, msg.Body)
}
}