前言:
Golang社区里有个曾经很火的消息队列nsq,现在貌似不热了。这边有很多的业务都依赖于nsq,python和golang项目还好说,直接TCP长连接挂上去。php是通过nsqd提供的http接口来投递数据。nsq有两种投递发布功能,一种是常规的消息推送(PublishMsg),另一个是延迟消息推送(DeferredPublish),比如我们可以指定一个任务在多久之后才可以被被消费。
说正题,在用nsq的中间遇到了一个问题,就是并发安全的问题。根据我的各方面测试,普通的PublishMsg并发推送是没有问题的,哪怕多协程粗暴的共用一个连接。但是延迟推送(DeferredPublish)是存在延迟效果失效的问题。不管是多协程共用一个连接,还是一个协程绑定一个连接,延迟消息都存在问题的。当然只是延迟时间失效,但是消息不会丢失。
2018-11-02 更新
问题已经修复了。
我分析的源代码是nsq的relase版本,但是线上延迟消息异常的nsq版本是rc版本的,不知道哪个神人配置的。。。 至此,nsq的延迟消息问题解决了。。。更新nsq服务端的版本就OK了。 下面的文章其实看不看都可以了….
测试
// xiaorui.cc import ( "fmt" "sync" "time" "github.com/bitly/go-nsq" ) var lock sync.Mutex var incrlock sync.Mutex var counter int func main() { config := nsq.NewConfig() q, _ := nsq.NewConsumer("aa", "ch", config) q.AddHandler(nsq.HandlerFunc(handle)) q.ConnectToNSQD("xxx") delayTS := time.Duration(20) * time.Second for index := 0; index < 20; index++ { val := fmt.Sprintf("hello id: %d", index) go func(idx string) { conf := nsq.NewConfig() conn, _ := nsq.NewProducer("xxxx", conf) conn.DeferredPublish("aa", delayTS, []byte(idx)) // conn.Close() }(val) // ok // go tool.Nsq.PublishMsg("aa", val) } go func() { for { fmt.Println(counter) time.Sleep(1 * time.Second) } }() select {} } func handle(msg *nsq.Message) error { incrlock.Lock() defer incrlock.Unlock() counter++ fmt.Println("recv new msg: ", string(msg.Body), time.Now().String()) return nil }
下面是tcpdump的抓包,我们能看到确实多个客户端连接,包里面能看到 DPUB的命令及后面的时间戳参数。但问题来了,id:.17这个任务没有了延迟效果, 立马就可以收到。
// xiaorui.cc 14:00:32.618457 IP 192.168.116.205.53061 > xxxx: Flags [P.], seq 339:371, ack 281, win 8192, length 32 0x0000: 4500 0048 0000 4000 4006 9dc5 c0a8 74cd E..H..@.@.....t. 0x0010: 2f68 380d cf45 1036 f816 c627 7e33 bda7 /h8..E.6...'~3.. 0x0020: 5018 2000 a164 0000 4450 5542 2061 6120 P....d..DPUB.aa. 0x0030: 3230 3030 300a 0000 000e 2268 656c 6c6f 20000....."hello 0x0040: 2069 643a 2031 3122 .id:.11" 14:00:32.618458 IP 192.168.116.205.53064 > xxxx: Flags [P.], seq 339:371, ack 281, win 8192, length 32 0x0000: 4500 0048 0000 4000 4006 9dc5 c0a8 74cd E..H..@.@.....t. 0x0010: 2f68 380d cf48 1036 afe5 ff88 6441 69ec /h8..H.6....dAi. 0x0020: 5018 2000 17df 0000 4450 5542 2061 6120 P.......DPUB.aa. 0x0030: 3230 3030 300a 0000 000e 2268 656c 6c6f 20000....."hello 0x0040: 2069 643a 2031 3722 .id:.17" 14:00:32.618496 IP 192.168.116.205.53059 > xxxx: Flags [P.], seq 339:370, ack 281, win 8192, length 31 0x0000: 4500 0047 0000 4000 4006 9dc6 c0a8 74cd E..G..@.@.....t. 0x0010: 2f68 380d cf43 1036 28a5 719a 065d 678b /h8..C.6(.q..]g. 0x0020: 5018 2000 a275 0000 4450 5542 2061 6120 P....u..DPUB.aa. 0x0030: 3230 3030 300a 0000 000d 2268 656c 6c6f 20000....."hello 0x0040: 2069 643a 2038 22 .id:.8" ... ... ...
但还是出现了问题…
分析go nsq源码
遇到这么诡异的问题,git issue里没人说明。只能看go nsq客户端的源码了, 我们发现其实他的逻辑很简单,每个nsq客户端连接都会go一个router()的协程,这个协程是可以保证消息推送的原子性。 Publish和DeferredPublish调用的都是sendCommandAsync函数, sendCommandAsync会把命令cmd结构发到一个channel里,然后由客户端自己的router协程去消费。另外可以注意到,WriteCommand往socket write数据的时候,也会尝试拿本客户端相关的锁。也就是说,go nsq客户端看起来从两个方面保证了协程 | 线程安全。
// xiaorui.cc func (w *Producer) Publish(topic string, body []byte) error { return w.sendCommand(Publish(topic, body)) } func (w *Producer) DeferredPublish(topic string, delay time.Duration, body []byte) error { return w.sendCommand(DeferredPublish(topic, delay, body)) } func (w *Producer) sendCommand(cmd *Command) error { doneChan := make(chan *ProducerTransaction) err := w.sendCommandAsync(cmd, doneChan, nil) if err != nil { close(doneChan) return err } t := <-doneChan return t.Error } func (w *Producer) sendCommandAsync(cmd *Command, doneChan chan *ProducerTransaction, ... if atomic.LoadInt32(&w.state) != StateConnected { err := w.connect() if err != nil { return err } } t := &ProducerTransaction{ cmd: cmd, doneChan: doneChan, Args: args, } select { case w.transactionChan <- t: case <-w.exitChan: return ErrStopped } return nil } func (w *Producer) connect() error { w.guard.Lock() defer w.guard.Unlock() ... go w.router() return nil } func (w *Producer) router() { for { select { case t := <-w.transactionChan: w.transactions = append(w.transactions, t) err := w.conn.WriteCommand(t.cmd) if err != nil { w.log(LogLevelError, "(%s) sending command - %s", w.conn.String(), err) w.close() } ... } func (c *Conn) WriteCommand(cmd *Command) error { c.mtx.Lock() fmt.Printf("%v cccc %v \n", cmd, string(cmd.Body)) _, err := cmd.WriteTo(c) if err != nil { goto exit } err = c.Flush() exit: c.mtx.Unlock() if err != nil { c.log(LogLevelError, "IO error - %s", err) c.delegate.OnIOError(c, err) } return err } // xiaorui.cc
很奇怪在多协程多连接下存在异常,go nsq client源码看起来又是合理的,没有发现奇怪的逻辑。 我尝试在所有的DeferredPublish逻辑前后共用一把全局锁,居然可以了。那么说明,貌似存在并发投递下的消息被串改写问题。但接下来我对这两个情况进行tcpdump抓包对比,发现很有意思的事情。加锁,所有的延迟任务是ok的,不加锁,有一些任务没有延迟效果,但是消息不丢。但通过抓包看到他们的body内容是一样的,没有什么字段被丢失或者覆盖。 记得以前给go-nsq和nsq都发过issue,询问他们是否可以加dpub和DeferredPublish的multi批量方法,最后没回复我,不知道有没有一些关联。
nsqd server 延迟消息源码
我们再来看看nsqd延迟消息的实现原理。nsq对于延迟消息没有用高大上的手段,直接存到本地内存的优先级队列里,因为没有持久化,所以逻辑相对简单。 当nsqd节点发生异常crash,那么数据自然丢失。下面nsqd的源码描述的很清楚,对于普通消息来说,当memoryMsgChan满了后会落盘持久化。对于延迟消息来说,直接就是内存里的优先级队列,没有做什么持久化方案。疑惑的是,这个事情在nsq的文档里没有标注。
// xiaorui.cc // PutMessage writes a Message to the queue func (c *Channel) PutMessage(m *Message) error { c.RLock() defer c.RUnlock() if c.Exiting() { return errors.New("exiting") } err := c.put(m) if err != nil { return err } atomic.AddUint64(&c.messageCount, 1) return nil } func (c *Channel) put(m *Message) error { select { case c.memoryMsgChan <- m: default: b := bufferPoolGet() err := writeMessageToBackend(b, m, c.backend) bufferPoolPut(b) c.ctx.nsqd.SetHealth(err) if err != nil { c.ctx.nsqd.logf(LOG_ERROR, "CHANNEL(%s): failed to write message to backend - %s", c.name, err) return err } } return nil } func (c *Channel) PutMessageDeferred(msg *Message, timeout time.Duration) { atomic.AddUint64(&c.messageCount, 1) c.StartDeferredTimeout(msg, timeout) } func (c *Channel) StartDeferredTimeout(msg *Message, timeout time.Duration) error { absTs := time.Now().Add(timeout).UnixNano() item := &pqueue.Item{Value: msg, Priority: absTs} err := c.pushDeferredMessage(item) if err != nil { return err } c.addToDeferredPQ(item) return nil }
需要关注的是nsqd对于存放延迟消息的优先级队列操作都有加锁。看起来也没有问题。
// xiaorui.cc func (c *Channel) addToDeferredPQ(item *pqueue.Item) { c.deferredMutex.Lock() heap.Push(&c.deferredPQ, item) c.deferredMutex.Unlock() } func (c *Channel) pushDeferredMessage(item *pqueue.Item) error { c.deferredMutex.Lock() id := item.Value.(*Message).ID _, ok := c.deferredMessages[id] ... c.deferredMutex.Unlock() return nil }
上面有说过,只需要在go-nsq客户端发送延迟消息时,统一加一把锁,让操作串行化就可以解决延迟失效的问题。问题又来了,在集群环境中又出现该问题了, 简单说单机测试脚本是没有问题,但并发执行脚本就又有出现延迟失效的问题。我个人怀疑是nsq的问题,但是nsqd服务端没有任何错误日志。
// xiaorui.cc go func(idx string) { conf := nsq.NewConfig() conn, _ := nsq.NewProducer("ip:port", conf) lock.Lock() // 加锁 conn.DeferredPublish("aa", delayTS, []byte(idx)) lock.Unlock() }(val)
最后的解决方法
个人能力有限,搞不定这nsq问题。索性直接抛弃nsq延迟消息的方案,使用redis zset自己实现一个延迟消息机制。 redis最少比nsq的持久化机制靠谱一些,另外nsq延迟消息不能主动去删除,只能等待消费ack删除。而redis是可以直接zrem删除的。
总结:
现在不确定是go nsq客户端还是nsqd server的问题,单纯看源代码是没什么问题。其实确定是不是客户端的问题,可以用其他原因的客户端,尴尬的是本想用python nsq client测试一波,但pynsq太难用了。好吧,知道怎么该解决问题的朋友可以联系我,也让我学习学习。