Producer实例构建
说明: kafka系列笔记的内容,主要参考文献: 《深入理解kafka核心设计与实践原理》
sarama
import (
"fmt"
"github.com/Shopify/sarama"
)
var (
brokerList = "localhost:9092"
topic = "localTestTopic"
config = initConfig()
producer = initProducer()
)
// @title: initConfig
// @description: 初始化生产者配置
func initConfig() (config *sarama.Config) {
config = sarama.NewConfig()
// 生产者ACK方式:
// WaitForAll = -1: 需要`min.insync.replicas`的ISR-replicas确认
// NoResponse = 0: 不等待响应
// WaitForLocal = 1: waits for only the local commit to succeed before responding. 不太理解什么是local commit????
config.Producer.RequiredAcks = sarama.WaitForAll
// 是否等待成功和失败后的响应,这里需要RequiredAcks != NoResponse
config.Producer.Return.Successes = true
config.Producer.Errors = true
return config
}
// @title: initSyncProducer
// @description: 初始化一个同步生产者
func initSyncProducer() (producer sarama.SyncProducer) {
producer, err := sarama.NewSyncProducer([]string{brokerList}, config)
if err != nil {
panic(err)
}
return producer
}
kafka的Producer实例是线程安全的,所以可以在多个线程中共享Producer实例,也可以池化生产者实例供其他线程调用
消息的发送
发送消息代码示例:
// @title: NewMessage
// @description: 生成一个message消息
func NewMessage(value string) *sarama.ProducerMessage {
msg := &sarama.ProducerMessage{}
msg.Topic = topic
msg.Value = sarama.StringEncoder(value)
return msg
}
func testSendMsg() {
// acks = NoResponse => offset = 0
partition, offset, err := producer.SendMessage(NewMessage("test message"))
if err != nil {
panic(err)
}
fmt.Println(partition, offset)
}
发送消息需要初始化一个消息结构体,消息结构体的定义如下:
// 消息的数据结构
type ProducerMessage struct {
Topic string
Partition int32
Key Encoder
Value Encoder
Headers []RecordHeader
Metadata interface{}
Timestamp time.Time
}
Topic:PartitionKeyValueStringEncoder(value)ByteEncoderTimestamp
生产者整体架构
架构模型图
拦截器
拦截器分为生产者拦截器和消费者拦截器。拦截器在消息经过序列化和分区之前进行,主要可以对消息做一些过滤或者修改消息内容。拦截器往往不是必需的。
官方拦截器主要实现三个方法:
type ProducerInterceptor interface {
func onSend(*ProducerMessage) // 在消息序列化和计算分区执行onSend()方法,对消息进行一些额外处理,如增加消息前缀、统计消息数量等
func onAcknowledgement(*ProducerMessage) // 在消息被应答之前或者发送失败时调用onAcknowledgement()方法
func close() // 主要用于在关闭拦截器时执行一些资源的清理工作
}
github.com/Shopify/sarama/interceptors.go
type ProducerInterceptor interface {
OnSend(*ProducerMessage)
}
自定义拦截器示例
// 自定义一个拦截器结果体,实现OnSend()方法
type DefinedInterceptor struct{}
// @title: OnSend
// @descriptor: 在消息值中增加前缀pref-
func (di *DefinedInterceptor) OnSend(message *sarama.ProducerMessage) {
value, _ := message.Value.(sarama.StringEncoder)
message.Value = sarama.StringEncoder(fmt.Sprintf("pref-%s", string(value)))
}
// 将拦截器定义到生产者配置
func testInterceptor() {
config.Producer.Interceptors = []sarama.ProducerInterceptor{&DefinedInterceptor{}}
}
func TestProducer() {
// 测试拦截器,在发送消息前,将拦截器加在到生产者配置
testInterceptor()
testSendMsg()
}
==============消费结果=================
➜ bin kafka-console-consumer --bootstrap-server localhost:9092 --topic localTestTopic
pref-test message
pref-test message
pref-test message
pref-test message
pref-test message
pref-test message
pref-test message
pref-test message
pref-test message
pref-test message
序列化器
消息必须转换成字节数组才能在网络中传播,从客户端发送到kafka服务器,因此序列化是消息发送前的必经之路。
官方序列化器主要实现三个方法:
func configure() // 配置当前类???不太明白
func serialize() []byte // 序列化操作
func close() // 关闭序列化器
Ecoder
type Encoder interface {
Encode() ([]byte, error)
Length() int
}
type StringEncoder string
func (s StringEncoder) Encode() ([]byte, error) {
return []byte(s), nil
}
func (s StringEncoder) Length() int {
return len(s)
}
同样golang还支持byteEncoder, 定义如下:
type ByteEncoder []byte
func (b ByteEncoder) Encode() ([]byte, error) {
return b, nil
}
func (b ByteEncoder) Length() int {
return len(b)
}
分区器
分区器的作用就是计算消息该发往哪个分区,注意,如果消息本身定义了partition字段指定分区,此时不需要分区器参与;其他情况都会经过分区器计算消息被发往的分区
官方定义中,分区器主要实现两个方法:
func partion() int // 计算消息被发往哪个分区,返回分区编号
func close()
golang的sarama包中,partitioner接口定义了分区器:
type Partitioner interface {
Partition(message *ProducerMessage, numPartitions int32) (int32, error)
RequiresConsistency() bool // sarama中增加这个方法,让接口定义key-> partition的映射关系是否不变
}
自定义分区器示例
// 自定义分区器
type DefinedPartitioner struct{}
// 极端自定义分区器,只发送1号分区
func (dp *DefinedPartitioner) Partition(message *sarama.ProducerMessage, numPartitions int32) (int32, error) {
return 1, nil
}
func (dp *DefinedPartitioner) RequiresConsistency() bool {
return false
}
func getPartitioner(topic string) sarama.Partitioner {
return &DefinedPartitioner{}
}
// 将分区器装配到生产者配置
func testPartitioner() {
config.Producer.Partitioner = getPartitioner
}
func TestProducer() {
// 测试拦截器
testInterceptor()
// 测试分区器
testPartitioner()
testSendMsg()
}
=================生产结果======================
GOROOT=/usr/local/go #gosetup
GOPATH=/Users/liushi/go #gosetup
/usr/local/go/bin/go build -i -o /private/var/folders/zd/b2mys8_11x59vgvfvpt4snb40000gn/T/___go_build_demo . #gosetup
/private/var/folders/zd/b2mys8_11x59vgvfvpt4snb40000gn/T/___go_build_demo
1 23
1 24
1 25
1 26
1 27
1 28
1 29
1 30
1 31
1 32
Process finished with exit code 0
可以看到,生产的消息都发送到了1号分区
消息累加器
RecordAccumulator
buffer.memoryProducerBatchProducerBatch
ProducerBatchbatch.sizeProducerBatchProducerBatch
Sender线程
ProducerSender
InFlightRequest
InFlightRequestRequestmax.in.flight.requests.per.connection
ProducerBatch => Request => KafkaCluster
Sender(partition, ProducerBatch)
partitionnodebrokerProducerBatchRequestkafka clusterRequestInFlightRequestkafka clusterkafka clusterInFlightRequestProducerBatch
重要的生产者参数
acksmax.request.sizeretriesretry.backoff.mscompression.typeCPU/IOlinger.msProducerBatchProducerBatchlinger.mslinger.msrequest.timeout.msbuffer.memorymax.block.msbatch.sizeProducerBatch