kafka golang

基于 sarama 封装, 目的在于使用简洁及优化 运行模式为 消费组的方式

支持功能

  1. 客户端连接池,可复用连接
  2. 通过build可构建多个消息主题发送及主题消费
  3. 默认使用消费者组模式

使用

1. 消费者

通过build 可同时构建多个需要的业务消费,指定不同的参数传入

var (
 addr    = "192.168.186.130:9092,192.168.186.201:9092,192.168.186.202:9092"
 topic   = "web_log"
 groupId = "testGroupId"
)

//使用阻塞模式
var consumerFactory = kafka_go.NewFactoryConsumer()
//使用非阻塞模式
// var consumerFactory = kafka_go.NewFactoryConsumerBackGround()

//消费构建
demoConsumerBuilder := kafka_go.NewBuildConsumer(addr, groupId, topic)
//设置kafka版本
demoConsumerBuilder.SetKafkaVersion("3.0.0")
//注册消费回调监听
demoConsumerBuilder.SetResponseListener(func(context *kafka_go.ConsumerMessageContext) {
	fmt.Printf("%+v\n", context.GetMessageString())
})

//注册消费构建(多个以参数区分)
consumerFactory.RegisterConsumer(demoConsumerBuilder)

//运行消费者
consumerFactory.Run()

使用方式

//消费构建
demoConsumerBuilder := kafka_go.NewBuildConsumer(addr, groupId, topic)
//设置kafka版本
demoConsumerBuilder.SetKafkaVersion("3.0.0")
groupdIdBuildConsumer
//一消费多协程

//消费构建
demo2ConsumerBuilder := kafka_go.NewBuildConsumer(Conf.Addr, Conf.GroupId2, Conf.Topic2)
demo2ConsumerBuilder.SetDebug(true)
demo2ConsumerBuilder.SetMultiplePartition(true) //开启多协程消费 默认为false
demo2ConsumerBuilder.SetIsAutoCommit(false)//开启多协程消费,此配置失效
demo2ConsumerBuilder.SetKafkaVersion("3.0.0")
demo2ConsumerBuilder.SetResponseListener(func(context *kafka_go.ConsumerMessageContext) {
		nowTime := time.Now().In(cstZone).Format(UTFALL_SECOND)
		fmt.Printf("%s xx received2 [partition:%d, topic:%s, content:%s]\n", nowTime, context.GetPartition(), context.GetTopic(), context.GetMessageString())
		context.GetSession().Ack()
})
//一消费一协程
demoConsumerBuilder := kafka_go.NewBuildConsumer(Conf.Addr, Conf.GroupId3, Conf.Topic3)
demoConsumerBuilder.SetDebug(true)
demoConsumerBuilder.SetIsAutoCommit(false)// 开启手动提交
demoConsumerBuilder.SetKafkaVersion("3.0.0")
demoConsumerBuilder.SetResponseListener(func(context *kafka_go.ConsumerMessageContext) {
	nowTime := time.Now().In(cstZone).Format(UTFALL_SECOND)
	fmt.Printf("%s xx received [partition:%d, topic:%s, content:%s]\n", nowTime, context.GetPartition(), context.GetTopic(), context.GetMessageString())
	//fmt.Printf("%+v\n", context.GetMessageString())
	context.GetSession().Ack()
})
默认为trueConsumerMessageContext.getSession().Ack()

type ConsumerResponseListener func(context *ConsumerMessageContext)

ConsumerMessageContext : 事件消息上下文

接口名称 说明
GetBuilder() BuildConsumerApi 返回当前执行回调中的build信息
GetGroupId() string 返回当前组
GetTopic() string 返回当前主题
GetPartition() int32 返回当前执行数据的分区
GetOffset() int64 返回当前数据的偏移数
GetMessage() []byte 获取消息主体 字节数组
GetMessageString() string 返回消息主体 字符串
GetTimeStamp() time.Time 返回消息时间
GetVal() *sarama.ConsumerMessage 返回原生消息内容
GetSession() *ConsumerSession 返回当前执行的session
接口名称 说明
Ack() 手动确认消息
IsAutoAck() bool 是否自动提交
GetSession() sarama.ConsumerGroupSession 原生session获取
GetMessage() *sarama.ConsumerMessage 原生消息获取

1. 生产者

通过build 可同时构建多个需要的业务生产者,指定不同的参数传入,默认生使用的是同步提交
底层维护一套连接池,根据设置最大连接数进行设置

var (
    productFactory = kafka_go.NewFactoryProduct()
)
demoBuild := kafka_go.NewBuildProduct("demo1", "192.168.186.130:9092,192.168.186.201:9092,192.168.186.202:9092").SetDebug(true).SetMaxConnection(4)
err := productFactory.Register(demoBuild).Connect()
if err != nil {
    log.Panicln(err)
}
num := 10
var wg = &sync.WaitGroup{}
for i := 0; i < num; i++ {
	wg.Add(1)
	go func() {
		defer wg.Done()
		partition, offset, err := productFactory.Push("demo1", Conf.Topic3, "生产者消息")
		if err != nil {
			fmt.Println(err)
		} else {
			fmt.Printf("发送成功  partition:%d, offset:%d", partition, offset)
		}
	}()
		//time.Sleep(time.Second)
}
wg.Wait()

使用方式

接口 说明
SetMaxConnection(maxConnection int32) BuildProductApi 设置连接池数量
SetAckType(ackType ProductAckType) BuildProductApi 设置消息确认方式
SetTransactional(isTransactional bool) BuildProductApi 是否开启事务提交
GetName() string 获取当前build name
GetAddr() string 获取当前服务地址
GetAddrSlice() []string 获取当前服务地址切片
GetMaxConnection() int32 获取当前大连接数
GetConnStrategy() ProductBalanceType 获取连接池加载类型, 默认为轮询
确认类型 说明
PRODUCT_ACK_TYPE_NONAL 不等待 broker 的 ack,这一操作提供了一个最低的延迟
PRODUCT_ACK_TYPE_FOLLOWER 等待 broker 的 ack,partition 的 leader 落盘成功后返回 ack
PRODUCT_ACK_TYPE_ALL 等待 broker 的 ack,partition 的 leader 和 follower (ISRL里的follower,不是全部的follower)全部落盘成功后才 返回 ack。但是如果在 follower 同步完成后,broker 发送 ack 之前,leader 发生故障,那么会造成数据重复, 开启事务后可避免数据重复