nsq是一个开源的分布式消息队列中间件,用Go语言编写。学习golang的时候正好看到,同时又想深入了解消息队列实现原理和go的特性,就从源码角度分析下。
nsqd
-
Topic: 标识一类消息,各个topic之间相互独立,nsqd会为每个topic创建一个Topic结构
-
Channel:标识一个队列,用来实现消费者之间的负载均衡。一个topic可以有多个channel,发布者的消息会被发送到每一个channel,相当于是广播操作。对于channel中的消息,会任选一个连接的消费者来发送。
如上图所示,对其中的一个topic有三个channel分为是Channel_A、Channel_B、Channel_C,发送消息M1、M2、M3时,消费者A、B、C、D、E收到的消息可能为如图所示。
tcp处理主逻辑
TCPServer
对每一个连接建立一个clientV2结构体,然后调用了protocolV2的IOLoop函数
newClientV2
根据与客户端的Conn连接和context来新建客户端结构
messagePump
新建一个goroutine来处理推送消息给客户端,包括主动发送的心跳、主动发送的消息。
这里可以重点关注下messagePump函数的流程,其中:
- memoryMsgChan: 从memoryMsgChan中获取消息进行处理
- StartInFlightTimeout: 分别调用了以下函数:
- pushInFlightMessage: 将消息放到channel的变量-inFlightMessages字典中,主要是防止消息重复
- addToInFlightPQ:将消息放到channel的变量inFligthPQ中
- SendingMessage:客户端结构体的方法,准备发送消息,即将待发送的消息计数加1,将总消息计数加1
- SendMessage:protocolV2的方法,里面调用了:
- WriteTo:将消息转为bytes数组
- Send:protocolV2的方法,调用:
- client.SetWriteDeadline: 即net.Conn的SetWriteDeadline方法,设置有超时时间的发送,当超时时,返回已经成功发送的字节数。
- protocol.SendFrameResponse:服务器侧的通用函数。用来在数据前面加上一个length头和frame头,然后发送。
memoryMsgChan
依次找到以下调代码:
memoryMsgChan = subChannel.memoryMsgChan
subChannel = <-subEventChan
subEventChan := client.SubEventChan
client.SubEventChan <- channel
最后一句代码所有函数在protocolV2的SUB函数中,作用是拿到指定topic的指定channel,因此memoryMsgChan获取的是上面图中的Channel中的msg。当多个客户端订阅了某个Channel,则这些客户端的protocolV2.memoryMsgChan都指向同一个channel message,因此保证了channel中消息只能被一个客户端收到,也保证了客户端的负载均衡。
loop
loop:处理客户端的请求,然后返回响应给客户端。跟messagePump区别是,一个是处理发送请求,一个是处理响应请求
- client.SetReadDeadline: 即net.Conn的SetReadDeadline,设置读取的超时时间
- client.Reader.ReadSlice:查找和返回\n之前的所有数据,返回的切片在下一次调用时会被覆盖
- p.Exec: 执行用户的请求命令,里面根据请求会调用不同的函数:如SUB/PUB等,执行完成后会返回一个response消息。如SUB是拿到指定的Channel赋值给client。
- p.Send:发送p.Exec返回的响应消息或者错误消息
PUB流程
- io.ReadFull:读取请求内容
- nsqd.GetTopic:根据topic名称获取Topic结构体
- topic.NewMessge:构造推送的消息
- topic.PutMessage: 将消息放到队列中,最终是写入到memoryMsgChan中
- client.PublishedMessage: 客户端发送统计
分析:这里不注意的话,可能没发现topic.memeryMsgChan是怎么发送到各个channel的memoryMsgChan中的,其实是在GetTopic中,如果此topic不存在,是会调用NewTopic来创建topic,在NewTopic中除了新建Topic结构体外,还会新起一个协程来运行messagePump,专门将Topic memoryMsgChan中的消息发送到各个channel中。注意,上述client结构也有一个messagePump函数,负责对单个客户端的消息推送。
nsqlookupd(待完善)
使用nsqlookupd的架构为:
疑问
使用nsqlookupd时,生产者为啥直接连到nsqd,而不是连接到nsqlookupd,但消费者是连接到nsqlookupd的?
Producer:
消费者,在go_nsq中,新建一个消费者的代码为:
producer, err := nsq.NewProducer(address, nsq.NewConfig())
其中address为某个nsqd的地址,如:“127.0.0.1:4150”
生产者是直接连接到nsqd,而消费者是连接到nsqlookupd的,刚开始一直在疑惑为啥生产者不也连接到nsqlookupd,来获取nsqd地址进行消息推送,后面看到这个帖子的说明:
https://github.com/nsqio/go-nsq/issues/170
简单翻译下:这是一个鸡生蛋蛋生鸡的问题,”/lookup”接口只会返回注册了指定topic的nsqd节点,但是假如你还没有发布topic到一个节点的话,没有手动介入的话将不能注册。在推荐的部署拓扑中,是直接在一个非常明确的nsqd上发布消息的(通常是本地)。可以通过“/nodes”接口返回所有的nsqd节点,但是拿到之后你必须决策使用哪一个节点。
总得说来,这里的理由是生产者发布一个topic时,请求nsqlookupd获取nsqd,有两种方式:
1、nsqlookupd从所有注册节点中选择一个节点,返回给生产者,但nsqlookupd没有此功能。
2、nsqlookupd返回所有节点,让生产者来选择,这样可以做到多机容灾。相比来说,比指定地址稍好点。
个人感觉,如果要生产者要做到比较完善的容灾,可以使用方式2来实现,同时也可以控制nsqd的负载均衡,但是量不大,直接用主备两个nsqd来容灾即可。