前言:

      Golang社区里有个曾经很火的消息队列nsq,现在貌似不热了。这边有很多的业务都依赖于nsq,python和golang项目还好说,直接TCP长连接挂上去。php是通过nsqd提供的http接口来投递数据。nsq有两种投递发布功能,一种是常规的消息推送(PublishMsg),另一个是延迟消息推送(DeferredPublish),比如我们可以指定一个任务在多久之后才可以被被消费。

     说正题,在用nsq的中间遇到了一个问题,就是并发安全的问题。根据我的各方面测试,普通的PublishMsg并发推送是没有问题的,哪怕多协程粗暴的共用一个连接。但是延迟推送(DeferredPublish)是存在延迟效果失效的问题。不管是多协程共用一个连接,还是一个协程绑定一个连接,延迟消息都存在问题的。当然只是延迟时间失效,但是消息不会丢失。

2018-11-02 更新

问题已经修复了。

     我分析的源代码是nsq的relase版本,但是线上延迟消息异常的nsq版本是rc版本的,不知道哪个神人配置的。。。  至此,nsq的延迟消息问题解决了。。。更新nsq服务端的版本就OK了。 下面的文章其实看不看都可以了….


测试

// xiaorui.cc

import (
	"fmt"
	"sync"
	"time"

	"github.com/bitly/go-nsq"
)

var lock sync.Mutex
var incrlock sync.Mutex
var counter int

func main() {
	config := nsq.NewConfig()
	q, _ := nsq.NewConsumer("aa", "ch", config)
	q.AddHandler(nsq.HandlerFunc(handle))
	q.ConnectToNSQD("xxx")

	delayTS := time.Duration(20) * time.Second
	for index := 0; index < 20; index++ {
		val := fmt.Sprintf("hello id: %d", index)
		go func(idx string) {
			conf := nsq.NewConfig()
			conn, _ := nsq.NewProducer("xxxx", conf)
			conn.DeferredPublish("aa", delayTS, []byte(idx))
            // conn.Close()
		}(val)

		// ok
		// go tool.Nsq.PublishMsg("aa", val)
	}

	go func() {
		for {
			fmt.Println(counter)
			time.Sleep(1 * time.Second)
		}
	}()
	select {}
}

func handle(msg *nsq.Message) error {
	incrlock.Lock()
	defer incrlock.Unlock()

	counter++
	fmt.Println("recv new msg: ", string(msg.Body), time.Now().String())
	return nil
}

下面是tcpdump的抓包,我们能看到确实多个客户端连接,包里面能看到 DPUB的命令及后面的时间戳参数。但问题来了,id:.17这个任务没有了延迟效果, 立马就可以收到。

// xiaorui.cc

14:00:32.618457 IP 192.168.116.205.53061 > xxxx: Flags [P.], seq 339:371, ack 281, win 8192, length 32
    0x0000:  4500 0048 0000 4000 4006 9dc5 c0a8 74cd  E..H..@.@.....t.
    0x0010:  2f68 380d cf45 1036 f816 c627 7e33 bda7  /h8..E.6...'~3..
    0x0020:  5018 2000 a164 0000 4450 5542 2061 6120  P....d..DPUB.aa.
    0x0030:  3230 3030 300a 0000 000e 2268 656c 6c6f  20000....."hello
    0x0040:  2069 643a 2031 3122                      .id:.11"
14:00:32.618458 IP 192.168.116.205.53064 > xxxx: Flags [P.], seq 339:371, ack 281, win 8192, length 32
    0x0000:  4500 0048 0000 4000 4006 9dc5 c0a8 74cd  E..H..@.@.....t.
    0x0010:  2f68 380d cf48 1036 afe5 ff88 6441 69ec  /h8..H.6....dAi.
    0x0020:  5018 2000 17df 0000 4450 5542 2061 6120  P.......DPUB.aa.
    0x0030:  3230 3030 300a 0000 000e 2268 656c 6c6f  20000....."hello
    0x0040:  2069 643a 2031 3722                      .id:.17"
14:00:32.618496 IP 192.168.116.205.53059 > xxxx: Flags [P.], seq 339:370, ack 281, win 8192, length 31
    0x0000:  4500 0047 0000 4000 4006 9dc6 c0a8 74cd  E..G..@.@.....t.
    0x0010:  2f68 380d cf43 1036 28a5 719a 065d 678b  /h8..C.6(.q..]g.
    0x0020:  5018 2000 a275 0000 4450 5542 2061 6120  P....u..DPUB.aa.
    0x0030:  3230 3030 300a 0000 000d 2268 656c 6c6f  20000....."hello
    0x0040:  2069 643a 2038 22                        .id:.8"
...
...
...

但还是出现了问题… 

分析go nsq源码

遇到这么诡异的问题,git issue里没人说明。只能看go nsq客户端的源码了, 我们发现其实他的逻辑很简单,每个nsq客户端连接都会go一个router()的协程,这个协程是可以保证消息推送的原子性。 Publish和DeferredPublish调用的都是sendCommandAsync函数, sendCommandAsync会把命令cmd结构发到一个channel里,然后由客户端自己的router协程去消费。另外可以注意到,WriteCommand往socket write数据的时候,也会尝试拿本客户端相关的锁。也就是说,go nsq客户端看起来从两个方面保证了协程 | 线程安全。

// xiaorui.cc


func (w *Producer) Publish(topic string, body []byte) error {
	return w.sendCommand(Publish(topic, body))
}

func (w *Producer) DeferredPublish(topic string, delay time.Duration, body []byte) error {
	return w.sendCommand(DeferredPublish(topic, delay, body))
}

func (w *Producer) sendCommand(cmd *Command) error {
    doneChan := make(chan *ProducerTransaction)
    err := w.sendCommandAsync(cmd, doneChan, nil)
    if err != nil {
        close(doneChan)
        return err
    }
    t := <-doneChan
    return t.Error
}

func (w *Producer) sendCommandAsync(cmd *Command, doneChan chan *ProducerTransaction,
    ...
    if atomic.LoadInt32(&w.state) != StateConnected {
        err := w.connect()
        if err != nil {
            return err
        }
    }

    t := &ProducerTransaction{
        cmd:      cmd,
        doneChan: doneChan,
        Args:     args,
    }

    select {
    case w.transactionChan <- t:
    case <-w.exitChan:
        return ErrStopped
    }

    return nil
}

func (w *Producer) connect() error {
	w.guard.Lock()
	defer w.guard.Unlock()
    ...
	go w.router()

	return nil
}

func (w *Producer) router() {
    for {
        select {
        case t := <-w.transactionChan:
            w.transactions = append(w.transactions, t)
            err := w.conn.WriteCommand(t.cmd)
            if err != nil {
                w.log(LogLevelError, "(%s) sending command - %s", w.conn.String(), err)
                w.close()
            }
    ...
}


func (c *Conn) WriteCommand(cmd *Command) error {
    c.mtx.Lock()

    fmt.Printf("%v cccc %v \n", cmd, string(cmd.Body))
    _, err := cmd.WriteTo(c)
    if err != nil {
        goto exit
    }
    err = c.Flush()

exit:
    c.mtx.Unlock()
    if err != nil {
        c.log(LogLevelError, "IO error - %s", err)
        c.delegate.OnIOError(c, err)
    }
    return err
}

// xiaorui.cc

    很奇怪在多协程多连接下存在异常,go nsq client源码看起来又是合理的,没有发现奇怪的逻辑。 我尝试在所有的DeferredPublish逻辑前后共用一把全局锁,居然可以了。那么说明,貌似存在并发投递下的消息被串改写问题。但接下来我对这两个情况进行tcpdump抓包对比,发现很有意思的事情。加锁,所有的延迟任务是ok的,不加锁,有一些任务没有延迟效果,但是消息不丢。但通过抓包看到他们的body内容是一样的,没有什么字段被丢失或者覆盖。 记得以前给go-nsq和nsq都发过issue,询问他们是否可以加dpub和DeferredPublish的multi批量方法,最后没回复我,不知道有没有一些关联。

nsqd server 延迟消息源码

    我们再来看看nsqd延迟消息的实现原理。nsq对于延迟消息没有用高大上的手段,直接存到本地内存的优先级队列里,因为没有持久化,所以逻辑相对简单。 当nsqd节点发生异常crash,那么数据自然丢失。下面nsqd的源码描述的很清楚,对于普通消息来说,当memoryMsgChan满了后会落盘持久化。对于延迟消息来说,直接就是内存里的优先级队列,没有做什么持久化方案。疑惑的是,这个事情在nsq的文档里没有标注。

// xiaorui.cc

// PutMessage writes a Message to the queue
func (c *Channel) PutMessage(m *Message) error {
	c.RLock()
	defer c.RUnlock()
	if c.Exiting() {
		return errors.New("exiting")
	}
	err := c.put(m)
	if err != nil {
		return err
	}
	atomic.AddUint64(&c.messageCount, 1)
	return nil
}

func (c *Channel) put(m *Message) error {
	select {
	case c.memoryMsgChan <- m:
	default:
		b := bufferPoolGet()
		err := writeMessageToBackend(b, m, c.backend)
		bufferPoolPut(b)
		c.ctx.nsqd.SetHealth(err)
		if err != nil {
			c.ctx.nsqd.logf(LOG_ERROR, "CHANNEL(%s): failed to write message to backend - %s",
				c.name, err)
			return err
		}
	}
	return nil
}

func (c *Channel) PutMessageDeferred(msg *Message, timeout time.Duration) {
	atomic.AddUint64(&c.messageCount, 1)
	c.StartDeferredTimeout(msg, timeout)
}

func (c *Channel) StartDeferredTimeout(msg *Message, timeout time.Duration) error {
	absTs := time.Now().Add(timeout).UnixNano()
	item := &pqueue.Item{Value: msg, Priority: absTs}
	err := c.pushDeferredMessage(item)
	if err != nil {
		return err
	}
	c.addToDeferredPQ(item)
	return nil
}

需要关注的是nsqd对于存放延迟消息的优先级队列操作都有加锁。看起来也没有问题。

// xiaorui.cc

func (c *Channel) addToDeferredPQ(item *pqueue.Item) {
	c.deferredMutex.Lock()
	heap.Push(&c.deferredPQ, item)
	c.deferredMutex.Unlock()
}


func (c *Channel) pushDeferredMessage(item *pqueue.Item) error {
	c.deferredMutex.Lock()
	id := item.Value.(*Message).ID
	_, ok := c.deferredMessages[id]
    ...
	c.deferredMutex.Unlock()
	return nil
}

     上面有说过,只需要在go-nsq客户端发送延迟消息时,统一加一把锁,让操作串行化就可以解决延迟失效的问题。问题又来了,在集群环境中又出现该问题了, 简单说单机测试脚本是没有问题,但并发执行脚本就又有出现延迟失效的问题。我个人怀疑是nsq的问题,但是nsqd服务端没有任何错误日志。

// xiaorui.cc

go func(idx string) {
    conf := nsq.NewConfig()
    conn, _ := nsq.NewProducer("ip:port", conf)
    lock.Lock() // 加锁
    conn.DeferredPublish("aa", delayTS, []byte(idx))
    lock.Unlock()
}(val)

最后的解决方法

个人能力有限,搞不定这nsq问题。索性直接抛弃nsq延迟消息的方案,使用redis zset自己实现一个延迟消息机制。 redis最少比nsq的持久化机制靠谱一些,另外nsq延迟消息不能主动去删除,只能等待消费ack删除。而redis是可以直接zrem删除的。

总结:

     现在不确定是go nsq客户端还是nsqd server的问题,单纯看源代码是没什么问题。其实确定是不是客户端的问题,可以用其他原因的客户端,尴尬的是本想用python nsq client测试一波,但pynsq太难用了。好吧,知道怎么该解决问题的朋友可以联系我,也让我学习学习。