1. 概述

NSQNSQ是一个实时分布式消息传递平台,旨在大规模运行,每天处理数十亿条消息。 它促进了没有单点故障的分布式和分散式拓扑,从而实现了容错能力和高可用性,并提供了可靠的消息传递保证。 查看功能和保证。 从操作上讲,NSQ易于配置和部署(所有参数均在命令行上指定,并且编译的二进制文件不具有运行时相关性)。 为了获得最大的灵活性,它与数据格式无关(消息可以是JSON,MsgPack,协议缓冲区或其他任何东西)。 官方提供了Go和Python库(以及许多其他客户端库),并且,如果您有兴趣构建自己的库,则有一个协议规范。

2. 基础应用场景

系统解耦异步处理流量削峰消息通信

3. 相关文档

4.安装操作

-rwxr-xr-x 1 captain 197121 5515776 8月  28 13:46 nsq_stat.exe*
-rwxr-xr-x 1 captain 197121 5823488 8月  28 13:46 nsq_tail.exe*
-rwxr-xr-x 1 captain 197121 5997568 8月  28 13:46 nsq_to_file.exe*
-rwxr-xr-x 1 captain 197121 5923840 8月  28 13:46 nsq_to_http.exe*
-rwxr-xr-x 1 captain 197121 5903872 8月  28 13:46 nsq_to_nsq.exe*
-rwxr-xr-x 1 captain 197121 8787968 8月  28 13:46 nsqadmin.exe*
-rwxr-xr-x 1 captain 197121 9108992 8月  28 13:46 nsqd.exe*
-rwxr-xr-x 1 captain 197121 8384000 8月  28 13:46 nsqlookupd.exe*
-rwxr-xr-x 1 captain 197121 5639680 8月  28 13:46 to_nsq.exe*

5. NSQ服务端基础组件介绍

5.1 nsqd

接收排队消息传递topicschannel

5.2 nsqlookupd

nsqlookupdtopicnsqdnsqdtopicchannel

5.3 nsqadmin

nsqadmin

重点提示:

nsqdnsqlookupdnsqadmin-- helpnsqdnsqlookupd

6.操作NSQ

6.1 安装客户端

go get -u github.com/nsqio/go-nsq

6.1 单机启动nsqd

nsqd
$ ./nsqd
[nsqd] 2019/11/10 13:41:29.575014 INFO: nsqd v1.2.0 (built w/go1.12.9)
[nsqd] 2019/11/10 13:41:29.593002 INFO: ID: 825
[nsqd] 2019/11/10 13:41:29.597000 INFO: TOPIC(topic_demo): created
[nsqd] 2019/11/10 13:41:29.599998 INFO: TOPIC(topic_demo): new channel(aa)
[nsqd] 2019/11/10 13:41:29.599998 INFO: NSQ: persisting topic/channel metadata to nsqd.dat
[nsqd] 2019/11/10 13:41:29.644973 INFO: HTTP: listening on [::]:4151
[nsqd] 2019/11/10 13:41:29.644973 INFO: TCP: listening on [::]:4150

我们同样可以指定端口

$ ./nsqd -http-address="0.0.0.0:8080" -tcp-address="0.0.0.0:8081"
[nsqd] 2019/11/10 14:05:40.726849 INFO: nsqd v1.2.0 (built w/go1.12.9)
[nsqd] 2019/11/10 14:05:40.745838 INFO: ID: 825
[nsqd] 2019/11/10 14:05:40.747836 INFO: NSQ: persisting topic/channel metadata to nsqd.dat
[nsqd] 2019/11/10 14:05:40.788814 INFO: TCP: listening on [::]:8081
[nsqd] 2019/11/10 14:05:40.788814 INFO: HTTP: listening on [::]:8080

nsqd
topicChannel“topics”topic“channels”channeltopicchanneltopictopicchanneltopictopictopicchannelchannelchanneltopicchannelchenneltopicchanneltopicchannelchanneltopicchannel消息消费者
6.1.1 单NSQ的使用
nsq_single_product.go
package main

import (
	"fmt"
	"github.com/nsqio/go-nsq"
	"time"
)
func main() {
	nsqAddr := "127.0.0.1:8081"
	conf :=nsq.NewConfig()
	p ,err := nsq.NewProducer(nsqAddr,conf)
	if err != nil {
		fmt.Println(err)
		return
	}
	for  {
		message := "message :"+ time.Now().Format("2006-01-02 15:04:05")
		fmt.Println(message)
		// 发送消息
		p.Publish("topic-demo1",[]byte(message))
		time.Sleep(time.Second)
	}

}

编写一个消息消费者

nsq_single_consumer.go
package main

import (
	"fmt"
	"github.com/nsqio/go-nsq"
)

type NewHandler struct{}

func (m *NewHandler) HandleMessage(msg *nsq.Message) (err error) {
	addr := msg.NSQDAddress
	message := string(msg.Body)
	fmt.Println(addr, message)
	return
}
func MyConsumers(topic, channel, addr string) {
	conf := nsq.NewConfig()
	new_consumer, err := nsq.NewConsumer(topic, channel, conf)
	if err != nil {

	}
	// 接收消息
	new_handler := &NewHandler{}
	new_consumer.AddHandler(new_handler)
	err = new_consumer.ConnectToNSQD(addr)
	if err != nil {

	}
}
func main() {
	addr := "127.0.0.1:8081"
	go MyConsumers("topic-demo1", "channel-aa", addr)
    // 模拟多个从多个channel去消息
	go MyConsumers("topic-demo1", "channel-bb", addr)
	select {}
}

6.1.2 通过nsqadmin查看
nsqadminnsqadmin
$ ./nsqadmin --nsqd-http-address="127.0.0.1:8080"
[nsqadmin] 2019/11/10 16:06:15.842033 INFO: nsqadmin v1.2.0 (built w/go1.12.9)
[nsqadmin] 2019/11/10 16:06:15.858026 INFO: HTTP: listening on [::]:4171
http://127.0.0.1:4171/

在这里插入图片描述
在这里插入图片描述

6.1.3 NSQ的单点结构

在这里插入图片描述

6.3 NSQ集群

6.3.1 启动NSQ各组件

构建一个NSQ的基础拓扑结构

nsqlookupdnsqd
nsqlookupdnsqlookupd--help
$ ./nsqlookupd
[nsqlookupd] 2019/11/10 16:40:55.968588 INFO: nsqlookupd v1.2.0 (built w/go1.12.9)
[nsqlookupd] 2019/11/10 16:40:55.983580 INFO: HTTP: listening on [::]:4161
[nsqlookupd] 2019/11/10 16:40:55.984579 INFO: TCP: listening on [::]:4160

nsqd-lookupd-tcp-address

添加第一个实例

./nsqd -http-address="0.0.0.0:8080" -tcp-address="0.0.0.0:8081" -lookupd-tcp-address="127.0.0.1:4160"

添加第二个实例

 ./nsqd -http-address="0.0.0.0:8090" -tcp-address="0.0.0.0:8091" -lookupd-tcp-address="127.0.0.1:4160"

-lookupd-http-address
$ ./nsqadmin -lookupd-http-address="127.0.0.1:4161"
nsqadmin

在这里插入图片描述

在这里插入图片描述

6.3.2 NSQ的拓扑结构

在这里插入图片描述

nsqdnsqd
6.3.3 Go语言操作NSQ代码示例

消息生产者

nsq_cluster_product.go
package main

import (
	"bufio"
	"fmt"
	"github.com/nsqio/go-nsq"
	"log"
	"os"
	"strings"
)

var pro *nsq.Producer

func NewPro(addr string) (err error) {
	conf := nsq.NewConfig()
	pro, err = nsq.NewProducer(addr, conf)
	if err != nil {
		log.Println(err)
		return err
	}
	return nil
}
func main() {
	nsqAddr := "127.0.0.1:8091"
	err := NewPro(nsqAddr)
	if err != nil {
		fmt.Println(err)
		return
	}else{
		fmt.Println("connect 127.0.0.1:8091 success")
	}
	// 读取标准输入
	reader := bufio.NewReader(os.Stdin)
	for {
		// 读取所有内容直到遇见回车(\n)
		data, err := reader.ReadString('\n')
		if err != nil {
			fmt.Println("read data from stdin is field : ", err)
			continue
		}
		// 当输入q的时候退出
		data = strings.TrimSpace(data)
		if strings.ToUpper(data) == "Q" {
			break
		}
		err = pro.Publish("topic-demo1", []byte(data))
		if err != nil {
			fmt.Println("nsq publish is field ", err)
			continue
		}
	}
	fmt.Println("exit !")
}

消息消费者

nsq_cluster_consumer.go
package main

import (
	"fmt"
	"github.com/nsqio/go-nsq"
)

type Handler struct{}

func (m *Handler) HandleMessage(msg *nsq.Message) (err error) {
	addr := msg.NSQDAddress
	message := string(msg.Body)
	fmt.Println(addr, message)
	return
}
func NewConsumers(t string, c string, addr string) error {
	conf := nsq.NewConfig()
	nc, err := nsq.NewConsumer(t, c, conf)
	if err != nil {
		fmt.Println("create consumer failed err ", err)
		return err
	}
	consumer := &Handler{}
	nc.AddHandler(consumer)
	// 连接nsqlookupd
	if err:= nc.ConnectToNSQLookupd(addr);err!=nil{
		fmt.Println("connect nsqlookupd failed ", err)
		return err
	}
	return nil
}
func main() {
	// 这是nsqlookupd的地址
	addr := "127.0.0.1:4161"
	err := NewConsumers("topic-demo1", "channel-aa", addr)
	if err != nil {
		fmt.Println("new nsq consumer failed", err)
		return
	}
	select {}
}