nsq是一个开源的分布式消息队列中间件,用Go语言编写。学习golang的时候正好看到,同时又想深入了解消息队列实现原理和go的特性,就从源码角度分析下。

nsqd

  • Topic: 标识一类消息,各个topic之间相互独立,nsqd会为每个topic创建一个Topic结构

  • Channel:标识一个队列,用来实现消费者之间的负载均衡。一个topic可以有多个channel,发布者的消息会被发送到每一个channel,相当于是广播操作。对于channel中的消息,会任选一个连接的消费者来发送。
    在这里插入图片描述
    如上图所示,对其中的一个topic有三个channel分为是Channel_A、Channel_B、Channel_C,发送消息M1、M2、M3时,消费者A、B、C、D、E收到的消息可能为如图所示。

tcp处理主逻辑

在这里插入图片描述

TCPServer

对每一个连接建立一个clientV2结构体,然后调用了protocolV2的IOLoop函数

newClientV2

根据与客户端的Conn连接和context来新建客户端结构

messagePump

新建一个goroutine来处理推送消息给客户端,包括主动发送的心跳、主动发送的消息。
在这里插入图片描述
这里可以重点关注下messagePump函数的流程,其中:

  • memoryMsgChan: 从memoryMsgChan中获取消息进行处理
  • StartInFlightTimeout: 分别调用了以下函数:
  • pushInFlightMessage: 将消息放到channel的变量-inFlightMessages字典中,主要是防止消息重复
  • addToInFlightPQ:将消息放到channel的变量inFligthPQ中
  • SendingMessage:客户端结构体的方法,准备发送消息,即将待发送的消息计数加1,将总消息计数加1
  • SendMessage:protocolV2的方法,里面调用了:
    • WriteTo:将消息转为bytes数组
    • Send:protocolV2的方法,调用:
      • client.SetWriteDeadline: 即net.Conn的SetWriteDeadline方法,设置有超时时间的发送,当超时时,返回已经成功发送的字节数。
      • protocol.SendFrameResponse:服务器侧的通用函数。用来在数据前面加上一个length头和frame头,然后发送。

memoryMsgChan

依次找到以下调代码:

memoryMsgChan = subChannel.memoryMsgChan
subChannel = <-subEventChan
subEventChan := client.SubEventChan
client.SubEventChan <- channel 

最后一句代码所有函数在protocolV2的SUB函数中,作用是拿到指定topic的指定channel,因此memoryMsgChan获取的是上面图中的Channel中的msg。当多个客户端订阅了某个Channel,则这些客户端的protocolV2.memoryMsgChan都指向同一个channel message,因此保证了channel中消息只能被一个客户端收到,也保证了客户端的负载均衡。

loop

loop:处理客户端的请求,然后返回响应给客户端。跟messagePump区别是,一个是处理发送请求,一个是处理响应请求

  • client.SetReadDeadline: 即net.Conn的SetReadDeadline,设置读取的超时时间
  • client.Reader.ReadSlice:查找和返回\n之前的所有数据,返回的切片在下一次调用时会被覆盖
  • p.Exec: 执行用户的请求命令,里面根据请求会调用不同的函数:如SUB/PUB等,执行完成后会返回一个response消息。如SUB是拿到指定的Channel赋值给client。
  • p.Send:发送p.Exec返回的响应消息或者错误消息

PUB流程

在这里插入图片描述

  • io.ReadFull:读取请求内容
  • nsqd.GetTopic:根据topic名称获取Topic结构体
  • topic.NewMessge:构造推送的消息
  • topic.PutMessage: 将消息放到队列中,最终是写入到memoryMsgChan中
  • client.PublishedMessage: 客户端发送统计

分析:这里不注意的话,可能没发现topic.memeryMsgChan是怎么发送到各个channel的memoryMsgChan中的,其实是在GetTopic中,如果此topic不存在,是会调用NewTopic来创建topic,在NewTopic中除了新建Topic结构体外,还会新起一个协程来运行messagePump,专门将Topic memoryMsgChan中的消息发送到各个channel中。注意,上述client结构也有一个messagePump函数,负责对单个客户端的消息推送。

nsqlookupd(待完善)

使用nsqlookupd的架构为:
在这里插入图片描述

疑问

使用nsqlookupd时,生产者为啥直接连到nsqd,而不是连接到nsqlookupd,但消费者是连接到nsqlookupd的?

Producer:
消费者,在go_nsq中,新建一个消费者的代码为:

producer, err := nsq.NewProducer(address, nsq.NewConfig())

其中address为某个nsqd的地址,如:“127.0.0.1:4150”

生产者是直接连接到nsqd,而消费者是连接到nsqlookupd的,刚开始一直在疑惑为啥生产者不也连接到nsqlookupd,来获取nsqd地址进行消息推送,后面看到这个帖子的说明:
https://github.com/nsqio/go-nsq/issues/170
简单翻译下:这是一个鸡生蛋蛋生鸡的问题,”/lookup”接口只会返回注册了指定topic的nsqd节点,但是假如你还没有发布topic到一个节点的话,没有手动介入的话将不能注册。在推荐的部署拓扑中,是直接在一个非常明确的nsqd上发布消息的(通常是本地)。可以通过“/nodes”接口返回所有的nsqd节点,但是拿到之后你必须决策使用哪一个节点。

总得说来,这里的理由是生产者发布一个topic时,请求nsqlookupd获取nsqd,有两种方式:
1、nsqlookupd从所有注册节点中选择一个节点,返回给生产者,但nsqlookupd没有此功能。
2、nsqlookupd返回所有节点,让生产者来选择,这样可以做到多机容灾。相比来说,比指定地址稍好点。
个人感觉,如果要生产者要做到比较完善的容灾,可以使用方式2来实现,同时也可以控制nsqd的负载均衡,但是量不大,直接用主备两个nsqd来容灾即可。