NSQ 是实时的分布式消息处理平台,其设计的目的是用来大规模地处理每天数以十亿计级别的消息。它具有分布式和去中心化拓扑结构,该结构具有无单点故障、故障容错、高可用性以及能够保证消息的可靠传递的特征。

安装

本机测试时使用的是 windows 环境就独自编译了 nsq 的各模块

go get github.com/nsqio/nsq
cd apps nsqd
go build nsqd.go

nsq 可以搭建 mq 集群,通过 nsqlookupd 发现管理 nsqd 实例,nsqadmin 以 web 的方式管理 nsqd

1.运行 nsqlookupd
D:\go\gopath\src\github.com\nsqio\nsq\apps\nsqlookupd>nsqlookupd.exe
[nsqlookupd] 2017/11/07 17:52:46.063484 INFO: nsqlookupd v1.0.0-compat (built w/
go1.8rc2)
[nsqlookupd] 2017/11/07 17:52:46.084485 INFO: TCP: listening on [::]:4160
[nsqlookupd] 2017/11/07 17:52:46.088485 INFO: HTTP: listening on [::]:4161
2.运行 nsqld
D:\go\gopath\src\github.com\nsqio\nsq\apps\nsqd>nsqd --lookupd-tcp-address=127.0.0.1:4160                                                                       [nsqd] 2017/11/07 17:55:17.983173 INFO: nsqd v1.0.0-compat (built w/go1.8rc2)   [nsqd] 2017/11/07 17:55:18.010175 INFO: ID: 710                                 [nsqd] 2017/11/07 17:55:18.011175 INFO: TOPIC(test): created                    [nsqd] 2017/11/07 17:55:18.012175 
3.运行 nsqadmin
D:\go\gopath\src\github.com\nsqio\nsq\apps\nsqadmin>nsqadmin --lookupd-http-address=127.0.0.1:4161                                                              [nsqadmin] 2017/11/07 17:58:30.405179 INFO: nsqadmin v1.0.0-compat (built w/go1.8rc2)                                                                           [nsqadmin] 2017/11/07 17:58:30.426180 INFO: HTTP: listening on [::]:4171

使用

1.管理

我们可以访问 http://127.0.0.1/ 来管理我们的 nsq

nsq_glpng

nsq_lppng

2.创建消息

除了客户端连接创建消息之外我们还可以通过 http 提交消息

1510107941206982115png

3.消费消息

nsq 的 topic 可以创建多个消费 channel,一条消息可以多个通道消费使用:

package main

import (
	"fmt"
	"github.com/nsqio/go-nsq"
	"time"
)

// 消费者
type ConsumerT struct{}

// 主函数
func main() {
	InitConsumer("test", "test-channel", "127.0.0.1:4161")
	for {
		time.Sleep(time.Second * 10)
	}
}

//处理消息
func (*ConsumerT) HandleMessage(msg *nsq.Message) error {
	fmt.Println("receive", msg.NSQDAddress, "message:", string(msg.Body))
	return nil
}

//初始化消费者
func InitConsumer(topic string, channel string, address string) {
	cfg := nsq.NewConfig()
	cfg.LookupdPollInterval = time.Second          //设置重连时间
	c, err := nsq.NewConsumer(topic, channel, cfg) // 新建一个消费者
	if err != nil {
		panic(err)
	}
	c.SetLogger(nil, 0)        //屏蔽系统日志
	c.AddHandler(&ConsumerT{}) // 添加消费者接口

	//建立NSQLookupd连接
	if err := c.ConnectToNSQLookupd(address); err != nil {
		panic(err)
	}
}

运行返回:

receive SC-201612261725:4150 message: test
receive SC-201612261725:4150 message: test
receive SC-201612261725:4150 message: test
receive SC-201612261725:4150 message: message1
receive SC-201612261725:4150 message: message2
receive SC-201612261725:4150 message: message2