最近在弄golang框架的事情,连接kafka,目前采用的是sarama进行连接,开发测试是ok的,但是考虑到在生产环境中使用。sarama还是有些问题的,问题出在它的consumer上,不能够直接使用,需要进行简单的处理,首先是处理topic和groupid的问题。

我们知道在kafka消费的时候,在同一个消费者组中是共同消费topic的,也就是说,后端服务能够共享的去消费topic中的内容,分别处理,从而增加吞吐,而saram在这一点上需要手动的处理。具体的代码如下:

package kafka

import (
   "fmt"
   "game-server/src/common/log"
   "github.com/Shopify/sarama"
   "strings"
   "sync"
)

type KafkaClient struct {
   asyncProducer sarama.AsyncProducer
   syncProcuder  sarama.SyncProducer
   consumer      sarama.Consumer
   consumerGroup sarama.ConsumerGroup
}

func NewKafkaClient() *KafkaClient {
   return &KafkaClient{
      asyncProducer: kafkaAsyncProducer,
      syncProcuder:  kafkaSyncProducer,
      consumer:      kafkaConsumer,
   }
}

var kafkaSyncProducer sarama.SyncProducer
var kafkaAsyncProducer sarama.AsyncProducer
var kafkaConsumer sarama.Consumer

func init() {
   brokerListStr := "localhost:9092"
   log.Infof("Init kafka addr:%s", brokerListStr)
   brokerList := strings.Split(brokerListStr, ",")
   config := sarama.NewConfig()
   config.Producer.Return.Successes = true
   config.Producer.Return.Errors = true
   asyncProducer, err := sarama.NewAsyncProducer(brokerList, config)
   if err != nil {
      log.Fatalf("kafka connect error:%v", err.Error())
   }
   kafkaAsyncProducer = asyncProducer
   syncProducer, err := sarama.NewSyncProducer(brokerList, config)
   if err != nil {
      log.Fatalf("kafka connect error:%v", err.Error())
   }
   kafkaSyncProducer = syncProducer
   consumerConfig := sarama.NewConfig()
   consumer, err := sarama.NewConsumer(brokerList, consumerConfig)
   if err != nil {
      log.Fatalf("kafka connect error:%v", err.Error())
   }
   kafkaConsumer = consumer
}

func (kc *KafkaClient) asyncSend(topic string, msg string) (int32, int64, error) {
   p := kc.asyncProducer
   p.Input() <- &sarama.ProducerMessage{
      Topic: topic,
      Value: sarama.StringEncoder(msg),
   }
   select {
   case res := <-p.Successes():
      return res.Partition, res.Offset, nil
   case err := <-p.Errors():
      log.Errorln("Produced message failure: ", err)
      return 0, 0, err
   }
}

func (kc *KafkaClient) syncSend(topic string, msg string) (int32, int64, error) {
   partition, offset, err := kc.syncProcuder.SendMessage(&sarama.ProducerMessage{
      Topic: topic,
      Value: sarama.StringEncoder(msg),
   })
   return partition, offset, err
}

func (kc *KafkaClient) recvMsg(topic string) {
   partitions, err := kc.consumer.Partitions(topic)
   if err != nil {
      log.Fatalln(err.Error())
      return
   }
   log.Infof("kafka receving msg from topic:%s,partitions:%v", topic, partitions)
   var wg sync.WaitGroup
   for _, partition := range partitions {
      partitionConsumer, err := kc.consumer.ConsumePartition(topic, partition, sarama.OffsetNewest)
      if err != nil {
         fmt.Println("partitionConsumer err:", err)
         continue
      }
      wg.Add(1)
      go func(partitionConsumer sarama.PartitionConsumer) {
         for {
            select {
            case res := <-partitionConsumer.Messages():
               fmt.Println(sarama.StringEncoder(res.Value))
            case err := <-partitionConsumer.Errors():
               fmt.Println(err.Error())
            }
         }
         wg.Done()
      }(partitionConsumer)
   }
   wg.Wait()
}

从上面的代码中可以清楚的看到,如果服务停止了,kafka中如果还有消息进入到topic中,由于每次消费的offset是sarama.OffsetNewest,这样会导致消息的丢失。为了解决这个问题,通过查询资料和网上的相关内容发现sarama有一个cluster,已经解决了这个问题的。通过修改代码最终实现了consumer消费的功能

package kafka

import (
	"fmt"
	"game-server/src/common/log"
	"github.com/Shopify/sarama"
	cluster "github.com/bsm/sarama-cluster"
	"strings"
)

type KafkaClient struct {
	asyncProducer sarama.AsyncProducer
	syncProcuder  sarama.SyncProducer
	consumer      *cluster.Consumer
}

func NewKafkaClient() *KafkaClient {
	return &KafkaClient{
		asyncProducer: kafkaAsyncProducer,
		syncProcuder:  kafkaSyncProducer,
		consumer:      kafkaConsumer,
	}
}

var kafkaSyncProducer sarama.SyncProducer
var kafkaAsyncProducer sarama.AsyncProducer
var kafkaConsumer *cluster.Consumer

func init() {
	brokerListStr := "localhost:9092"
	log.Infof("Init kafka addr:%s", brokerListStr)
	brokerList := strings.Split(brokerListStr, ",")
	config := sarama.NewConfig()
	config.Producer.Return.Successes = true
	config.Producer.Return.Errors = true
	asyncProducer, err := sarama.NewAsyncProducer(brokerList, config)
	if err != nil {
		log.Fatalf("kafka connect error:%v", err.Error())
	}
	kafkaAsyncProducer = asyncProducer
	syncProducer, err := sarama.NewSyncProducer(brokerList, config)
	if err != nil {
		log.Fatalf("kafka connect error:%v", err.Error())
	}
	kafkaSyncProducer = syncProducer
	consumerConfig := cluster.NewConfig()
	groupId := "test_group_id_001"
	topicName1 := "test1"
	consumer, err := cluster.NewConsumer(brokerList, groupId, []string{topicName1}, consumerConfig)
	if err != nil {
		log.Fatalf("kafka connect error:%v", err.Error())
	}
	kafkaConsumer = consumer
}

func (kc *KafkaClient) asyncSend(topic string, msg string) (int32, int64, error) {
	p := kc.asyncProducer
	p.Input() <- &sarama.ProducerMessage{
		Topic: topic,
		Value: sarama.StringEncoder(msg),
	}
	select {
	case res := <-p.Successes():
		return res.Partition, res.Offset, nil
	case err := <-p.Errors():
		log.Errorln("Produced message failure: ", err)
		return 0, 0, err
	}
}

func (kc *KafkaClient) syncSend(topic string, msg string) (int32, int64, error) {
	partition, offset, err := kc.syncProcuder.SendMessage(&sarama.ProducerMessage{
		Topic: topic,
		Value: sarama.StringEncoder(msg),
	})
	return partition, offset, err
}

func (kc *KafkaClient) RecvMsg(f func(message *sarama.ConsumerMessage)) {
	consumer := kc.consumer
	for {
		select {
		case msg, ok := <-consumer.Messages():
			if ok {
				//fmt.Fprintf(os.Stdout, "%s/%d/%d\t%s\t%s\n", msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value)
				consumer.MarkOffset(msg, "") // mark message as processed
				go f(msg)
			}
		}
	}
}

当然,上面的例子是针对处理所有topic的所有partition的,如果想要分partition处理的话,可以参考官网上的例子。