I am using Kafka 10.0 and https://github.com/Shopify/sarama. I am trying to get the offset of the latest message that a consumer processed.
To do so I've found the method NewOffsetManagerFromClient(group string, client Client) which require the group name.
How do I get consumer group name?
offsets := make(map[int32]int64)
config := sarama.NewConfig()
config.Consumer.Offsets.CommitInterval = 200 * time.Millisecond
config.Version = sarama.V0_10_0_0
// config.Consumer.Offsets.Initial = sarama.OffsetNewest
cli, _ := sarama.NewClient(kafkaHost, config)
defer cli.Close()
offsetManager, _ := sarama.NewOffsetManagerFromClient(group, cli)
for _, partition := range partitions {
partitionOffsetManager, _ := offsetManager.ManagePartition(topic, partition)
offset, _ := partitionOffsetManager.NextOffset()
offsets[partition] = offset
}
return offsets
I created a consumer with
consumer := sarama.NewConsumer(connections, config)
but I do not know how to create a consumer group and get its group name.