NSQ是一个基于Go语言的分布式实时消息平台,它基于MIT开源协议发布,代码托管在GitHub。NSQ可用于大规模系统中的实时消息服务,并且每天能够处理数亿级别的消息,其设计目标是为在分布式环境下运行的去中心化服务提供一个强大的基础架构。NSQ具有分布式、去中心化的拓扑结构,该结构具有无单点故障、故障容错、高可用性以及能够保证消息的可靠传递的特征。NSQ非常容易配置和部署,且具有最大的灵活性,支持众多消息协议。另外,官方还提供了拆箱即用Go和Python库。
部署
安装步骤
# 下载
wget https://s3.amazonaws.com/bitly-downloads/nsq/nsq-1.1.0.linux-amd64.go1.10.3.tar.gz
# 解压
tar -zxvf nsq-1.1.0.linux-amd64.go1.10.3.tar.gz
# 启动服务
cd nsq-1.1.0.linux-amd64.go1.10.3/bin/
nohup ./nsqlookupd > /dev/null 2>&1 &
nohup ./nsqd --lookupd-tcp-address=127.0.0.1:4160 > /dev/null 2>&1 &
nohup ./nsqadmin --lookupd-http-address=127.0.0.1:4161 > /dev/null 2>&1 &
使用
1、创建一个test主题,并发送一个hello world消息
curl -d 'hello world' 'http://127.0.0.1:4151/pub?topic=test'
http://127.0.0.1:4171/
image.png
3 消费test主题的消息
$ ./nsq_to_file --topic=test --output-dir=/tmp --lookupd-http-address=127.0.0.1:4161
2019/03/13 11:09:49 INF 1 [test/nsq_to_file] querying nsqlookupd http://127.0.0.1:4161/lookup?topic=test
2019/03/13 11:09:49 INF 1 [test/nsq_to_file] (jinchunguang-TM1701:4150) connecting to nsqd
2019/03/13 11:09:49 INFO: opening /tmp/test.jinchunguang-TM1701.2019-03-13_11.log
2019/03/13 11:09:49 syncing 1 records to disk
$ cat /tmp/test.jinchunguang-TM1701.2019-03-13_11.log
hello world
客户端
生产者可使用PHP curl 直接处理,github有许多现成的客户端可以使用
<?php
$msg="最简单的发送消息方式!";
$url= "http://127.0.0.1:4151/pub?topic=test";
$ch = curl_init();
curl_setopt($ch, CURLOPT_URL, $url);
curl_setopt($ch, CURLOPT_CUSTOMREQUEST, "POST");
curl_setopt($ch, CURLOPT_POSTFIELDS, $msg);
curl_setopt($ch, CURLOPT_HTTPHEADER, array(
'Content-Type: text/html; charset=utf-8',
'Content-Length: ' . strlen($msg))
);
$output = curl_exec($ch);
if($output === FALSE ){
echo "CURL Error:".curl_error($ch);
}
使用go来处理
代码目录结构如下(示例项目,通过gin封装的,学习使用)
image.png
nsq.go 简单封装nsq操作
package servers
import (
"fmt"
"github.com/nsqio/go-nsq"
)
// 默认配置
const HOST = "127.0.0.1:4150"
const TOPIC_NAME = "test"
const CHANNEL_NAME = "test-channel"
// 启动Nsq
func NsqRun() {
Consumer()
}
// nsq发布消息
func Producer(msgBody string) {
// 新建生产者
p, err := nsq.NewProducer(HOST, nsq.NewConfig())
if err != nil {
panic(err)
}
// 发布消息
if err := p.Publish(TOPIC_NAME, []byte(msgBody)); err != nil {
panic(err)
}
}
// nsq订阅消息
type ConsumerT struct{}
func (*ConsumerT) HandleMessage(msg *nsq.Message) error {
fmt.Println(string(msg.Body))
return nil
}
func Consumer() {
c, err := nsq.NewConsumer(TOPIC_NAME, CHANNEL_NAME, nsq.NewConfig()) // 新建一个消费者
if err != nil {
panic(err)
}
c.AddHandler(&ConsumerT{}) // 添加消息处理
if err := c.ConnectToNSQD(HOST); err != nil { // 建立连接
panic(err)
}
}
main.go 项目入口文件
package main
import (
"github.com/gin-gonic/gin"
"wages_service/servers"
"wages_service/tasks"
)
var GinEngine *gin.Engine
func main() {
// 运行 task
tasks.SyncDataRun()
// 运行 nsq
servers.NsqRun()
// 运行server
servers.HttpRun(GinEngine)
}
nsq_producer.go 我们测试用来发送消息的
package main
import "wages_service/servers"
func main() {
// 发送消息到nsq
servers.Producer("hello world!!!")
}
运行测试
- 启动项目
image.png
-
推送消息
image.png 查看结果
image.png