最近在弄golang框架的事情,连接kafka,目前采用的是sarama进行连接,开发测试是ok的,但是考虑到在生产环境中使用。sarama还是有些问题的,问题出在它的consumer上,不能够直接使用,需要进行简单的处理,首先是处理topic和groupid的问题。
我们知道在kafka消费的时候,在同一个消费者组中是共同消费topic的,也就是说,后端服务能够共享的去消费topic中的内容,分别处理,从而增加吞吐,而saram在这一点上需要手动的处理。具体的代码如下:
package kafka
import (
"fmt"
"game-server/src/common/log"
"github.com/Shopify/sarama"
"strings"
"sync"
)
type KafkaClient struct {
asyncProducer sarama.AsyncProducer
syncProcuder sarama.SyncProducer
consumer sarama.Consumer
consumerGroup sarama.ConsumerGroup
}
func NewKafkaClient() *KafkaClient {
return &KafkaClient{
asyncProducer: kafkaAsyncProducer,
syncProcuder: kafkaSyncProducer,
consumer: kafkaConsumer,
}
}
var kafkaSyncProducer sarama.SyncProducer
var kafkaAsyncProducer sarama.AsyncProducer
var kafkaConsumer sarama.Consumer
func init() {
brokerListStr := "localhost:9092"
log.Infof("Init kafka addr:%s", brokerListStr)
brokerList := strings.Split(brokerListStr, ",")
config := sarama.NewConfig()
config.Producer.Return.Successes = true
config.Producer.Return.Errors = true
asyncProducer, err := sarama.NewAsyncProducer(brokerList, config)
if err != nil {
log.Fatalf("kafka connect error:%v", err.Error())
}
kafkaAsyncProducer = asyncProducer
syncProducer, err := sarama.NewSyncProducer(brokerList, config)
if err != nil {
log.Fatalf("kafka connect error:%v", err.Error())
}
kafkaSyncProducer = syncProducer
consumerConfig := sarama.NewConfig()
consumer, err := sarama.NewConsumer(brokerList, consumerConfig)
if err != nil {
log.Fatalf("kafka connect error:%v", err.Error())
}
kafkaConsumer = consumer
}
func (kc *KafkaClient) asyncSend(topic string, msg string) (int32, int64, error) {
p := kc.asyncProducer
p.Input() <- &sarama.ProducerMessage{
Topic: topic,
Value: sarama.StringEncoder(msg),
}
select {
case res := <-p.Successes():
return res.Partition, res.Offset, nil
case err := <-p.Errors():
log.Errorln("Produced message failure: ", err)
return 0, 0, err
}
}
func (kc *KafkaClient) syncSend(topic string, msg string) (int32, int64, error) {
partition, offset, err := kc.syncProcuder.SendMessage(&sarama.ProducerMessage{
Topic: topic,
Value: sarama.StringEncoder(msg),
})
return partition, offset, err
}
func (kc *KafkaClient) recvMsg(topic string) {
partitions, err := kc.consumer.Partitions(topic)
if err != nil {
log.Fatalln(err.Error())
return
}
log.Infof("kafka receving msg from topic:%s,partitions:%v", topic, partitions)
var wg sync.WaitGroup
for _, partition := range partitions {
partitionConsumer, err := kc.consumer.ConsumePartition(topic, partition, sarama.OffsetNewest)
if err != nil {
fmt.Println("partitionConsumer err:", err)
continue
}
wg.Add(1)
go func(partitionConsumer sarama.PartitionConsumer) {
for {
select {
case res := <-partitionConsumer.Messages():
fmt.Println(sarama.StringEncoder(res.Value))
case err := <-partitionConsumer.Errors():
fmt.Println(err.Error())
}
}
wg.Done()
}(partitionConsumer)
}
wg.Wait()
}
从上面的代码中可以清楚的看到,如果服务停止了,kafka中如果还有消息进入到topic中,由于每次消费的offset是sarama.OffsetNewest,这样会导致消息的丢失。为了解决这个问题,通过查询资料和网上的相关内容发现sarama有一个cluster,已经解决了这个问题的。通过修改代码最终实现了consumer消费的功能
package kafka
import (
"fmt"
"game-server/src/common/log"
"github.com/Shopify/sarama"
cluster "github.com/bsm/sarama-cluster"
"strings"
)
type KafkaClient struct {
asyncProducer sarama.AsyncProducer
syncProcuder sarama.SyncProducer
consumer *cluster.Consumer
}
func NewKafkaClient() *KafkaClient {
return &KafkaClient{
asyncProducer: kafkaAsyncProducer,
syncProcuder: kafkaSyncProducer,
consumer: kafkaConsumer,
}
}
var kafkaSyncProducer sarama.SyncProducer
var kafkaAsyncProducer sarama.AsyncProducer
var kafkaConsumer *cluster.Consumer
func init() {
brokerListStr := "localhost:9092"
log.Infof("Init kafka addr:%s", brokerListStr)
brokerList := strings.Split(brokerListStr, ",")
config := sarama.NewConfig()
config.Producer.Return.Successes = true
config.Producer.Return.Errors = true
asyncProducer, err := sarama.NewAsyncProducer(brokerList, config)
if err != nil {
log.Fatalf("kafka connect error:%v", err.Error())
}
kafkaAsyncProducer = asyncProducer
syncProducer, err := sarama.NewSyncProducer(brokerList, config)
if err != nil {
log.Fatalf("kafka connect error:%v", err.Error())
}
kafkaSyncProducer = syncProducer
consumerConfig := cluster.NewConfig()
groupId := "test_group_id_001"
topicName1 := "test1"
consumer, err := cluster.NewConsumer(brokerList, groupId, []string{topicName1}, consumerConfig)
if err != nil {
log.Fatalf("kafka connect error:%v", err.Error())
}
kafkaConsumer = consumer
}
func (kc *KafkaClient) asyncSend(topic string, msg string) (int32, int64, error) {
p := kc.asyncProducer
p.Input() <- &sarama.ProducerMessage{
Topic: topic,
Value: sarama.StringEncoder(msg),
}
select {
case res := <-p.Successes():
return res.Partition, res.Offset, nil
case err := <-p.Errors():
log.Errorln("Produced message failure: ", err)
return 0, 0, err
}
}
func (kc *KafkaClient) syncSend(topic string, msg string) (int32, int64, error) {
partition, offset, err := kc.syncProcuder.SendMessage(&sarama.ProducerMessage{
Topic: topic,
Value: sarama.StringEncoder(msg),
})
return partition, offset, err
}
func (kc *KafkaClient) RecvMsg(f func(message *sarama.ConsumerMessage)) {
consumer := kc.consumer
for {
select {
case msg, ok := <-consumer.Messages():
if ok {
//fmt.Fprintf(os.Stdout, "%s/%d/%d\t%s\t%s\n", msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value)
consumer.MarkOffset(msg, "") // mark message as processed
go f(msg)
}
}
}
}
当然,上面的例子是针对处理所有topic的所有partition的,如果想要分partition处理的话,可以参考官网上的例子。