在 Golang 中消费来自 NSQ(消息队列)的数据

先决条件

Go Channels

NSQ(message queue) :https://nsq.io/

问题陈述

大规模系统中一个非常常见的问题是:您必须从一个服务中消费大量数据,进行一些处理并将处理后的数据推送到另一个服务

NSQ500MongoDB/Redis

方法# 1(效率低)

NSQ
MongoDBconsumer
package main
import (
 nsq "github.com/nsqio/go-nsq"
)

func AddMongoDB(data){
 // your code for preprocessing and adding new value in mongoDB
}

var handleMessage = func(msg *nsq.Message) error {
 var data interface{}
 _ := json.Unmarshal(msg.Body, &data)

 AddMongoDB(data)
 return nil
}

func InitNsqConsumer(){
 nsqTopic := "dummy_topic"
 nsqChannel := "dummy_channel"

 consumer, err := nsq.NewConsumer(nsqTopic, nsqChannel, nsq.NewConfig())
 consumer.AddHandler(nsq.HandleFunc(handleMessage))

 err = consumer.ConnectToNSQLookupd("127.0.0.1:4161")
 if err != nil{
  log.Print("error connecting to nsqlookupd")
 }
}

有什么问题。

msgMongoDB500

  • Redis/MongoDBgoroutine

    此解决方案无法扩展:(

    方法2(有效的方法)

    channelsgo-routineschannels
    var handleMessage = func(msg *nsq.Message) error {
     var data interface{}
     _ := json.Unmarshal(msg.Body, &data)

     consumerChannel <- data
     return nil
    }

    gochannels
    hannelsMongoDBbulkArray1000
    func consumeMessage(){
     interval := time.NewTicker(time.Second * 10) // ticker of every 10 second
     threshold := 1000
     bulkArray = make([]interface, 0)

     for {
      select {
      case <- interval.C:
       AddMongoBulk(bulkArray)
       bulkArray = nil

      case msg := <-consumerChannel:
       bulkArray = append(bulkArray, msg)
       if len(bulkArray) >= threshold { // pushing 1000 messages to MongoDB at a time
        AddMongoBulk(bulkArray)
        bulkArray = nil
       }
      }
     }
    }

    完整的代码(有效的方法)

    package main
    import (
     nsq "github.com/nsqio/go-nsq"
    )

    var consumerChannel chan interface

    func AddMongoBulk(data){
     // your code for preprocessing and adding new value in mongoDB in bulk
    }

    var handleMessage = func(msg *nsq.Message) error {
     var data interface{}
     _ := json.Unmarshal(msg.Body, &data)

     consumerChannel <- data
     return nil
    }

    func consumeMessage(){
     interval := time.NewTicker(time.Second * 10) // ticker of every 10 second
     threshold := 1000
     bulkArray = make([]interface, 0)

     for {
      select {
      case <- interval.C:
       AddMongoBulk(bulkArray)
       bulkArray = nil

      case msg := <-consumerChannel:
       bulkArray = append(bulkArray, msg)
       if len(bulkArray) >= threshold { // pushing 1000 messages to MongoDB at a time
        AddMongoBulk(bulkArray)
        bulkArray = nil
       }
      }
     }
    }

    func InitNsqConsumer(){
     consumerChannel = make(chan interface, 1) // channel of buffer 1, if buffer = 0, it will be a blocking channel
     go consumeMessage()

     // Below code is same as Method #1(inefficient) solution
     nsqTopic := "dummy_topic"
     nsqChannel := "dummy_channel"
     consumer, err := nsq.NewConsumer(nsqTopic, nsqChannel, nsq.NewConfig())
     consumer.AddHandler(nsq.HandleFunc(handleMessage))
     err = consumer.ConnectToNSQLookupd("127.0.0.1:4161")
     if err != nil{
      log.Print("error connecting to nsqlookupd")
     }
    }


    参考资料

    参考[1]

    参考资料

    [1]

    参考: https://medium.com/@ajs219/consume-data-from-nsq-message-queue-in-golang-98278a63c192