在 Golang 中消费来自 NSQ(消息队列)的数据
先决条件
Go Channels
NSQ(message queue) :https://nsq.io/
问题陈述
大规模系统中一个非常常见的问题是:您必须从一个服务中消费大量数据,进行一些处理并将处理后的数据推送到另一个服务
NSQ500MongoDB/Redis
方法# 1(效率低)
NSQ
MongoDBconsumer
package main
import (
nsq "github.com/nsqio/go-nsq"
)
func AddMongoDB(data){
// your code for preprocessing and adding new value in mongoDB
}
var handleMessage = func(msg *nsq.Message) error {
var data interface{}
_ := json.Unmarshal(msg.Body, &data)
AddMongoDB(data)
return nil
}
func InitNsqConsumer(){
nsqTopic := "dummy_topic"
nsqChannel := "dummy_channel"
consumer, err := nsq.NewConsumer(nsqTopic, nsqChannel, nsq.NewConfig())
consumer.AddHandler(nsq.HandleFunc(handleMessage))
err = consumer.ConnectToNSQLookupd("127.0.0.1:4161")
if err != nil{
log.Print("error connecting to nsqlookupd")
}
}
有什么问题。
msgMongoDB500
Redis/MongoDBgoroutine
此解决方案无法扩展:(
方法2(有效的方法)
channelsgo-routineschannels
var handleMessage = func(msg *nsq.Message) error {
var data interface{}
_ := json.Unmarshal(msg.Body, &data)
consumerChannel <- data
return nil
}
gochannels
hannelsMongoDBbulkArray1000
func consumeMessage(){
interval := time.NewTicker(time.Second * 10) // ticker of every 10 second
threshold := 1000
bulkArray = make([]interface, 0)
for {
select {
case <- interval.C:
AddMongoBulk(bulkArray)
bulkArray = nil
case msg := <-consumerChannel:
bulkArray = append(bulkArray, msg)
if len(bulkArray) >= threshold { // pushing 1000 messages to MongoDB at a time
AddMongoBulk(bulkArray)
bulkArray = nil
}
}
}
}
完整的代码(有效的方法)
package main
import (
nsq "github.com/nsqio/go-nsq"
)
var consumerChannel chan interface
func AddMongoBulk(data){
// your code for preprocessing and adding new value in mongoDB in bulk
}
var handleMessage = func(msg *nsq.Message) error {
var data interface{}
_ := json.Unmarshal(msg.Body, &data)
consumerChannel <- data
return nil
}
func consumeMessage(){
interval := time.NewTicker(time.Second * 10) // ticker of every 10 second
threshold := 1000
bulkArray = make([]interface, 0)
for {
select {
case <- interval.C:
AddMongoBulk(bulkArray)
bulkArray = nil
case msg := <-consumerChannel:
bulkArray = append(bulkArray, msg)
if len(bulkArray) >= threshold { // pushing 1000 messages to MongoDB at a time
AddMongoBulk(bulkArray)
bulkArray = nil
}
}
}
}
func InitNsqConsumer(){
consumerChannel = make(chan interface, 1) // channel of buffer 1, if buffer = 0, it will be a blocking channel
go consumeMessage()
// Below code is same as Method #1(inefficient) solution
nsqTopic := "dummy_topic"
nsqChannel := "dummy_channel"
consumer, err := nsq.NewConsumer(nsqTopic, nsqChannel, nsq.NewConfig())
consumer.AddHandler(nsq.HandleFunc(handleMessage))
err = consumer.ConnectToNSQLookupd("127.0.0.1:4161")
if err != nil{
log.Print("error connecting to nsqlookupd")
}
}
参考资料
参考[1]
参考资料
参考: https://medium.com/@ajs219/consume-data-from-nsq-message-queue-in-golang-98278a63c192