1.1.1 生产者源码

package main

import (
   "fmt"

   "github.com/Shopify/sarama"
)

func main() {
   // 创建配置信息
   config := sarama.NewConfig()

   // 设置
   // ack应答机制:ack可以看做一种信号,用于消费者来确认消息是否落盘
   config.Producer.RequiredAcks = sarama.WaitForAll

   // 发送分区
   // Generates partitioners for choosing the partition to send messages to (defaults to hashing the message key). Similar to the `partitioner.class` setting for the JVM producer.
   // 生成用于选择要将消息发送到的分区的分区标识符(默认为散列消息键)。类似于JVM生成器的' partitioner.class '设置。
   config.Producer.Partitioner = sarama.NewRandomPartitioner

   // 回复确认
   // Return specifies what channels will be populated. If they are set to true, you must read from the respective channels to prevent deadlock. If, however, this config is used to create a `SyncProducer`, both must be set to true and you shall not read from the channels since the producer does this internally.
   // Return指定将填充哪些通道。如果将它们设置为true,则必须从相应的通道读取以防止死锁。然而,如果这个配置被用来创建一个“SyncProducer”,两者都必须设置为true,并且你不能从通道中读取,因为生产者内部会这样做。
   config.Producer.Return.Successes = true

   // 构造一个消息
   msg := &sarama.ProducerMessage{}
   msg.Topic = "message"
   msg.Value = sarama.StringEncoder("【DAL科技】您正在注册DAL科技信息短信平台账号,验证码是:888888,3分钟内有效,请及时输入。")

   // 连接kafka
   client, err := sarama.NewSyncProducer([]string{"192.168.18.101:9092"}, config)
   if err != nil {
      fmt.Println("创建生产者失败:", err)
   }
   defer client.Close()

   // 发送消息
   pid, offset, err := client.SendMessage(msg)
   if err != nil {
      fmt.Println("发送消息失败:", err)
      return
   }
   fmt.Printf("pid:%v offset:%v \n ", pid, offset)

}

1.1.2 消费者源码

package main

import (
   "fmt"
   "sync"

   "github.com/Shopify/sarama"
)

var wg sync.WaitGroup

func main() {
   //创建新的消费者
   consumer, err := sarama.NewConsumer([]string{"192.168.18.101:9092"}, nil)
   if err != nil {
      fmt.Println("创建消费者失败:", err)
   }

   //根据topic获取所有的分区列表
   partitionList, err := consumer.Partitions("message")
   if err != nil {
      fmt.Println("根据topic获取分区列表失败:", err)
   }
   fmt.Println(partitionList)

   //遍历所有的分区
   for p := range partitionList {
      //针对每一个分区创建一个对应分区的消费者
      pc, err := consumer.ConsumePartition("message", int32(p), sarama.OffsetNewest)
      if err != nil {
         fmt.Printf("消费者消费指定分区失败: 分区 %d, 错误:%v\n", p, err)
      }
      defer pc.AsyncClose()
      wg.Add(1)

      //异步从每个分区消费信息
      go func(sarama.PartitionConsumer) {
         for msg := range pc.Messages() {
            fmt.Printf("partition:%d offset:%d key:%v value:%s \n",
               msg.Partition, msg.Offset, msg.Key, msg.Value)
         }
      }(pc)
   }
   wg.Wait()
}

1.1.3 运行方式

安装依赖:go mod tidy

运行消费者:go run examples/z01_hello_old/consumer/main.go

运行生产者:go run examples/z01_hello_old/producer/main.go

注意:消费者启动以后,可以多运行几次生产者,然后查看控制台。消费者能够循环监听来自生产者生产的消息。

1.1.4 生产者源码解析

// 创建配置信息
config := sarama.NewConfig()

// 设置
// ack应答机制:ack可以看做一种信号,用于消费者来确认消息是否落盘
// WaitForAll waits for all in-sync replicas to commit before responding. The minimum number of in-sync replicas is configured on the broker via the `min.insync.replicas` configuration key.
// WaitForAll等待所有同步副本在响应之前提交。同步副本的最小数量通过' min.insync '在代理上配置。副本的关键配置。
config.Producer.RequiredAcks = sarama.WaitForAll

// 发送分区
// Generates partitioners for choosing the partition to send messages to (defaults to hashing the message key). Similar to the `partitioner.class` setting for the JVM producer.
// 生成用于选择要将消息发送到的分区的分区标识符(默认为散列消息键)。类似于JVM生成器的' partitioner.class '设置。
config.Producer.Partitioner = sarama.NewRandomPartitioner

// 回复确认
// Return specifies what channels will be populated. If they are set to true, you must read from the respective channels to prevent deadlock. If, however, this config is used to create a `SyncProducer`, both must be set to true and you shall not read from the channels since the producer does this internally.
// Return指定将填充哪些通道。如果将它们设置为true,则必须从相应的通道读取以防止死锁。然而,如果这个配置被用来创建一个“SyncProducer”,两者都必须设置为true,并且你不能从通道中读取,因为生产者内部会这样做。
config.Producer.Return.Successes = true

构造消息。使用创建的配置信息构造消息,这是关键代码,是读者可能会经常修改的地方。构造消息就像是创建文章或者添加评论一样,只需要提供标题Topic和内容Value即可。

// 构造一个消息
msg := &sarama.ProducerMessage{}
msg.Topic = "message"
msg.Value = sarama.StringEncoder("【DAL科技】您正在注册DAL科技信息短信平台账号,验证码是:888888,3分钟内有效,请及时输入。")
连接kafka。有了消息以后,我们就需要连接kafka发送消息。此时我们可以把kafka看成是一个快递小哥,我们要发送的消息就是我们的快递,kafka负责帮我们取走快递,并邮寄出去。不同的是,我们不需要关心到底邮寄到哪里,取快递的人会自己从快递小哥那里取走。
// 连接kafka
client, err := sarama.NewSyncProducer([]string{"192.168.18.101:9092"}, config)
if err != nil {
   fmt.Println("创建生产者失败:", err)
}
defer client.Close()

发送消息。有了消息内容,又连接了kafka以后,就可以发送消息了。发送消息的返回值中,pid是partition分区的ID,offset是kafka消费者在对应分区上已经消费的消息数。

// 发送消息
pid, offset, err := client.SendMessage(msg)
if err != nil {
   fmt.Println("发送消息失败:", err)
   return
}
fmt.Printf("pid:%v offset:%v \n ", pid, offset)

1.1.5 消费者源码解析

创建新的消费者。创建消费者不需要特殊的配置,只需要指定IP地址和端口号,连接kafka就可以进行消费。

//创建新的消费者
consumer, err := sarama.NewConsumer([]string{"192.168.18.101:9092"}, nil)
if err != nil {
   fmt.Println("创建消费者失败:", err)
}

根据topic获取分区列表。生产者在进行生产的时候,消息有topic标题和value内容,这里就是根据生产者的topic标题获取该topic下的分区列表。
//根据topic获取所有的分区列表
partitionList, err := consumer.Partitions("message")
if err != nil {
   fmt.Println("根据topic获取分区列表失败:", err)
}
fmt.Println(partitionList)

遍历所有的分区进行消费。得到分区列表以后,我们就可以遍历所有的分区,分别取出每个分区的消息进行消费。

//遍历所有的分区
for p := range partitionList {
   //针对每一个分区创建一个对应分区的消费者
   pc, err := consumer.ConsumePartition("message", int32(p), sarama.OffsetNewest)
   if err != nil {
      fmt.Printf("消费者消费指定分区失败: 分区 %d, 错误:%v\n", p, err)
   }
   defer pc.AsyncClose()
   wg.Add(1)

   //异步从每个分区消费信息
   go func(sarama.PartitionConsumer) {
      for msg := range pc.Messages() {
         fmt.Printf("partition:%d offset:%d key:%v value:%s \n",
            msg.Partition, msg.Offset, msg.Key, msg.Value)
      }
   }(pc)
}
wg.Wait()