传统的消费模型
抢
消费组
消费组
以下是自己的理解:
比如订阅了两个主题,每个主题的分区是2,则消费组启动 4 个消费实例,每个消费实例对应一个分区,这样避免了 消费实例的竞争。同时也避免了发布/订阅模型 消费实例必须订阅所有分区的问题。
上代码
import (
"context"
"fmt"
"sync"
"github.com/Shopify/sarama"
)
// // 实现 github.com/Shopify/sarama/consumer_group.go/ConsumerGroupHandler 这个接口
type AAAConsumerGroupHandler struct {
}
func (AAAConsumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error {
return nil
}
func (AAAConsumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error {
return nil
}
// 这个方法用来消费消息的
func (h AAAConsumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
// 获取消息
for msg := range claim.Messages() {
fmt.Printf("Message topic:%q partition:%d offset:%d\n", msg.Topic, msg.Partition, msg.Offset)
fmt.Println("msg key: ", string(msg.Key))
fmt.Println("msg value: ", string(msg.Value))
// 将消息标记为已使用
sess.MarkMessage(msg, "")
}
return nil
}
// 接收数据
func main() {
// 先初始化 kafka
config := sarama.NewConfig()
// Version 必须大于等于 V0_10_2_0
config.Version = sarama.V0_10_2_1
config.Consumer.Return.Errors = true
fmt.Println("start connect kafka")
// 开始连接kafka服务器
group, err := sarama.NewConsumerGroup([]string{"127.0.0.1:9092"}, "AAA-group", config)
if err != nil {
fmt.Println("connect kafka failed; err", err)
return
}
// 检查错误
go func() {
for err := range group.Errors() {
fmt.Println("group errors : ", err)
}
}()
ctx := context.Background()
fmt.Println("start get msg")
// for 是应对 consumer rebalance
for {
// 需要监听的主题
topics := []string{"test"}
handler := AAAConsumerGroupHandler{}
// 启动kafka消费组模式,消费的逻辑在上面的 ConsumeClaim 这个方法里
err := group.Consume(ctx, topics, handler)
if err != nil {
fmt.Println("consume failed; err : ", err)
return
}
}
}