1. 生产者生产消息
func main() {
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll // 发送完数据需要leader和follow都确认
config.Producer.Partitioner = sarama.NewRandomPartitioner // 新选出⼀个partition
config.Producer.Return.Successes = true // 成功交付的消息将在success channel返回
// sasl认证
config.Net.SASL.Enable = true
config.Net.SASL.User = "admin"
config.Net.SASL.Password = "admin"
// 连接kafka
client, err := sarama.NewSyncProducer([]string{"192.168.67.128:9092", "192.168.67.129:9092", "192.168.67.130:9092"}, config)
if err != nil {
fmt.Println("producer close, err:", err)
return
}
defer client.Close()
// 构造⼀个消息
msg := &sarama.ProducerMessage{}
msg.Topic = "ffcs"
msg.Value = sarama.StringEncoder("hello kafka")
// 发送消息
pid, offset, err := client.SendMessage(msg)
if err != nil {
fmt.Println("send message failed,", err)
return
}
fmt.Printf("pid:%v offset:%v\n", pid, offset)
}
2.异步发送
func main() {
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Partitioner = sarama.NewRandomPartitioner
config.Producer.Return.Successes = true
p, err := sarama.NewAsyncProducer([]string{"192.168.67.128:9092", "192.168.67.129:9092", "192.168.67.130:9092"}, config)
if err != nil {
fmt.Println("producer close, err:", err)
return
}
defer p.Close()
go func(p sarama.AsyncProducer) {
errors := p.Errors()
for {
select {
case err := <-errors:
if err != nil {
glog.Errorln(err)
}
case <-p.Successes():
logs.Info("消息发送producer成功")
}
}
}(p)
for {
v :="async: "+ strconv.Itoa(rand.New(rand.NewSource(time.Now().UnixNano())).Intn(10000))
fmt.Fprintln(os.Stdout, v)
msg := &sarama.ProducerMessage{
Topic: "ffcs",
Value: sarama.ByteEncoder(v),
}
p.Input() <- msg
time.Sleep(time.Second *1)
}
}
3. 消费者消费消息
func main() {
config := sarama.NewConfig()
config.Consumer.Return.Errors = true
config.Version = sarama.V0_11_0_2
config.Net.SASL.Enable = true
config.Net.SASL.User = "admin"
config.Net.SASL.Password = "admin"
// consumer
consumer, err := sarama.NewConsumer([]string{"192.168.67.128:9092", "192.168.67.129:9092", "192.168.67.130:9092"}, config)
if err != nil {
fmt.Printf("consumer_test create consumer error %s\n", err.Error())
return
}
partitionList, err := consumer.Partitions("test") // 根据topic取到所有的分区
if err != nil {
fmt.Printf("fail to get list of partition:err%v\n", err)
return
}
defer consumer.Close()
for partition := range partitionList {
partitionConsumer, err := consumer.ConsumePartition("ffcs", int32(partition), sarama.OffsetOldest)
if err != nil {
fmt.Printf("try create partition_consumer error %s\n", err.Error())
return
}
defer partitionConsumer.Close()
for {
select {
case msg := <-partitionConsumer.Messages():
fmt.Printf("msg offset: %d, partition: %d, timestamp: %s, value: %s\n",
msg.Offset, msg.Partition, msg.Timestamp.String(), string(msg.Value))
case err := <-partitionConsumer.Errors():
fmt.Printf("err :%s\n", err.Error())
}
}
}
}
4. 消费者组
type Consumer struct {
ready chan bool
consumerName string
}
func main() {
consumer := &Consumer{
ready: make(chan bool),
consumerName: "consumer1",
}
consumer2 := &Consumer{
ready: make(chan bool),
consumerName: "consumer2",
}
ctx, consumerCancel := context.WithCancel(context.Background())
wg := &sync.WaitGroup{}
wg.Add(2)
go startConsumer(ctx, []string{"ffcs"}, consumer)
go startConsumer(ctx, []string{"ffcs"}, consumer2)
wg.Wait()
consumerCancel()
}
func (consumer *Consumer) Setup(_ sarama.ConsumerGroupSession) error {
// Mark the consumer as ready
close(consumer.ready)
return nil
}
func (consumer *Consumer) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for message := range claim.Messages() {
logs.Info("ConsumerName:%s Message claimed: value = %s, topic = %s,partition=%d", consumer.consumerName, string(message.Value), message.Topic, message.Partition)
session.MarkMessage(message, "")
}
return nil
}
func startConsumer(ctx context.Context, topics []string, consumer *Consumer) {
config := sarama.NewConfig()
config.Consumer.Return.Errors = true
config.Version = sarama.V0_11_0_2
config.Net.SASL.Enable = true
config.Net.SASL.User = "admin"
config.Net.SASL.Password = "admin"
// consumer
consumerGroup, err := sarama.NewConsumerGroup([]string{"192.168.67.142:9092"}, "test-group", config)
if err != nil {
fmt.Printf("NewConsumerGroup create consumer error %s\n", err.Error())
return
}
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
for {
if err := consumerGroup.Consume(context.Background(), topics, consumer); err != nil {
fmt.Printf("Error from consumer: %v", err)
}
consumer.ready = make(chan bool)
}
}()
<-consumer.ready
select {
case <-ctx.Done():
logs.Info("kafka terminating: context cancelled")
}
wg.Wait()
}
5. 自定义分区器
type myPartitioner struct {
partition int32
}
func (p *myPartitioner) Partition(message *sarama.ProducerMessage, numPartitions int32) (int32, error) {
encode, err := message.Value.Encode()
if err != nil {
return 0, err
}
value := string(encode)
ret := p.partition
if strings.Contains(value, "chan") {
p.partition = 1
}
return ret, nil
}
func (p *myPartitioner) RequiresConsistency() bool {
return false
}
func NewMyPartitioner(topic string) sarama.Partitioner {
return &myPartitioner{}
}
func main(){
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Partitioner = NewMyPartitioner // 自定义的分区器
}