Go语言操作Kafka文档

本文档详细介绍了如何使用Go语言对Kafka进行基础操作。我们将介绍如何使用Go连接Kafka、生产消息、消费消息。以下是详细操作步骤:

1. 安装驱动

首先,使用以下命令安装Sarama,一个优秀的Kafka Go客户端库:

go get github.com/Shopify/sarama

2. 导入依赖

导入必要的依赖包:

import (
	"fmt"
	"github.com/Shopify/sarama"
	"log"
	"os"
	"os/signal"
	"strings"
	"sync"
	"time"
)

3. 生产者(Producer)

创建一个函数,用于连接并返回一个Kafka生产者:

func createProducer(brokers []string) (sarama.AsyncProducer, error) {
	config := sarama.NewConfig()
	config.Producer.Return.Successes = true
	config.Producer.Timeout = 5 * time.Second
	return sarama.NewAsyncProducer(brokers, config)
}

创建一个函数,用于发送消息到指定的Kafka主题:

func produceMessage(producer sarama.AsyncProducer, topic, value string) {
	message := &sarama.ProducerMessage{
		Topic: topic,
		Value: sarama.StringEncoder(value),
	}

	producer.Input() <- message
}

4. 消费者(Consumer)

创建一个函数,用于连接并返回一个Kafka消费者:

func createConsumer(brokers []string, groupID string) (sarama.ConsumerGroup, error) {
	config := sarama.NewConfig()
	config.Consumer.Offsets.Initial = sarama.OffsetOldest
	return sarama.NewConsumerGroup(brokers, groupID, config)
}

定义一个消费者组对象:

type KafkaConsumerGroupHandler struct {
	ready chan bool
}

func (handler *KafkaConsumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error {
	close(handler.ready)
	return nil
}

func (handler *KafkaConsumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error {
	return nil
}

func (handler *KafkaConsumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
	for message := range claim.Messages() {
		fmt.Printf("消息: 主题=%s 分区=%d 偏移量=%d\n", message.Topic, message.Partition, message.Offset)
		fmt.Printf("消息内容: %s\n", string(message.Value))
		sess.MarkMessage(message, "")
	}
	return nil
}

创建一个函数,用于消费指定的Kafka主题:

func consumeMessages(consumer sarama.ConsumerGroup, topics []string) {
	handler := &KafkaConsumerGroupHandler{
		ready: make(chan bool),
	}

	for {
		err := consumer.Consume(context.Background(), topics, handler)
		if err != nil {
			log.Printf("消费者错误: %v", err)
		}

		select {
		case <-handler.ready:
		default:
			return
		}
	}
}

参考代码

main
func main() {
	brokers := strings.Split("localhost:9092", ",")
	topic := "my_topic"
	groupID := "my_group"

	// 创建生产者
	producer, err := createProducer(brokers)
	if err != nil {
		log.Fatal("无法创建生产者:", err)
	}
	defer func() {
		if err := producer.Close(); err != nil {
			log.Fatal("无法关闭生产者:", err)
		}
	}()

	// 发送消息
	produceMessage(producer, topic, "hello world")

	// 创建消费者
	consumer, err := createConsumer(brokers, groupID)
	if err != nil {
		log.Fatal("无法创建消费者:", err)
	}
	defer func() {
		if err := consumer.Close(); err != nil {
			log.Fatal("无法关闭消费者:", err)
		}
	}()

	topics := []string{topic}
	wg := &sync.WaitGroup{}
	wg.Add(1)

	go func() {
		defer wg.Done()
		consumeMessages(consumer, topics)
	}()

	// 监听退出信号
	sigterm := make(chan os.Signal, 1)
	signal.Notify(sigterm, os.Interrupt)
	<-sigterm

	// 优雅关闭消费者
	wg.Wait()
}

当你运行上述程序时,你将首先连接到Kafka集群并创建一个生产者,然后发送一条"hello world"消息到名为 “my_topic” 的主题。接下来,程序创建一个消费者,用于消费刚刚发送的消息并在终端输出消息内容。程序运行过程中,使用Ctrl+C或发送中断信号,可以优雅终止消费者并退出程序。