NSQ 是实时的分布式消息处理平台,其设计的目的是用来大规模地处理每天数以十亿计级别的消息。它具有分布式和去中心化拓扑结构,该结构具有无单点故障、故障容错、高可用性以及能够保证消息的可靠传递的特征。
安装
本机测试时使用的是 windows 环境就独自编译了 nsq 的各模块
go get github.com/nsqio/nsq
cd apps nsqd
go build nsqd.go
nsq 可以搭建 mq 集群,通过 nsqlookupd 发现管理 nsqd 实例,nsqadmin 以 web 的方式管理 nsqd
1.运行 nsqlookupd
D:\go\gopath\src\github.com\nsqio\nsq\apps\nsqlookupd>nsqlookupd.exe
[nsqlookupd] 2017/11/07 17:52:46.063484 INFO: nsqlookupd v1.0.0-compat (built w/
go1.8rc2)
[nsqlookupd] 2017/11/07 17:52:46.084485 INFO: TCP: listening on [::]:4160
[nsqlookupd] 2017/11/07 17:52:46.088485 INFO: HTTP: listening on [::]:4161
2.运行 nsqld
D:\go\gopath\src\github.com\nsqio\nsq\apps\nsqd>nsqd --lookupd-tcp-address=127.0.0.1:4160 [nsqd] 2017/11/07 17:55:17.983173 INFO: nsqd v1.0.0-compat (built w/go1.8rc2) [nsqd] 2017/11/07 17:55:18.010175 INFO: ID: 710 [nsqd] 2017/11/07 17:55:18.011175 INFO: TOPIC(test): created [nsqd] 2017/11/07 17:55:18.012175
3.运行 nsqadmin
D:\go\gopath\src\github.com\nsqio\nsq\apps\nsqadmin>nsqadmin --lookupd-http-address=127.0.0.1:4161 [nsqadmin] 2017/11/07 17:58:30.405179 INFO: nsqadmin v1.0.0-compat (built w/go1.8rc2) [nsqadmin] 2017/11/07 17:58:30.426180 INFO: HTTP: listening on [::]:4171
使用
1.管理
我们可以访问 http://127.0.0.1/ 来管理我们的 nsq
2.创建消息
除了客户端连接创建消息之外我们还可以通过 http 提交消息
3.消费消息
nsq 的 topic 可以创建多个消费 channel,一条消息可以多个通道消费使用:
package main
import (
"fmt"
"github.com/nsqio/go-nsq"
"time"
)
// 消费者
type ConsumerT struct{}
// 主函数
func main() {
InitConsumer("test", "test-channel", "127.0.0.1:4161")
for {
time.Sleep(time.Second * 10)
}
}
//处理消息
func (*ConsumerT) HandleMessage(msg *nsq.Message) error {
fmt.Println("receive", msg.NSQDAddress, "message:", string(msg.Body))
return nil
}
//初始化消费者
func InitConsumer(topic string, channel string, address string) {
cfg := nsq.NewConfig()
cfg.LookupdPollInterval = time.Second //设置重连时间
c, err := nsq.NewConsumer(topic, channel, cfg) // 新建一个消费者
if err != nil {
panic(err)
}
c.SetLogger(nil, 0) //屏蔽系统日志
c.AddHandler(&ConsumerT{}) // 添加消费者接口
//建立NSQLookupd连接
if err := c.ConnectToNSQLookupd(address); err != nil {
panic(err)
}
}
运行返回:
receive SC-201612261725:4150 message: test
receive SC-201612261725:4150 message: test
receive SC-201612261725:4150 message: test
receive SC-201612261725:4150 message: message1
receive SC-201612261725:4150 message: message2
receive SC-201612261725:4150 message: message2