Producer实例构建

说明: kafka系列笔记的内容,主要参考文献: 《深入理解kafka核心设计与实践原理》

sarama
import (
	"fmt"
	"github.com/Shopify/sarama"
)

var (
	brokerList = "localhost:9092"
	topic      = "localTestTopic"
	config     = initConfig()
	producer   = initProducer()
)

// @title: initConfig
// @description: 初始化生产者配置
func initConfig() (config *sarama.Config) {
	config = sarama.NewConfig()
	// 生产者ACK方式:
	// 	WaitForAll = -1: 需要`min.insync.replicas`的ISR-replicas确认
	// 	NoResponse = 0: 不等待响应
	// 	WaitForLocal = 1: waits for only the local commit to succeed before responding. 不太理解什么是local commit????
	config.Producer.RequiredAcks = sarama.WaitForAll
	// 是否等待成功和失败后的响应,这里需要RequiredAcks != NoResponse
	config.Producer.Return.Successes = true
	config.Producer.Errors = true
	return config
}

// @title: initSyncProducer
// @description: 初始化一个同步生产者
func initSyncProducer() (producer sarama.SyncProducer) {
	producer, err := sarama.NewSyncProducer([]string{brokerList}, config)
	if err != nil {
		panic(err)
	}
	return producer
}

kafka的Producer实例是线程安全的,所以可以在多个线程中共享Producer实例,也可以池化生产者实例供其他线程调用

消息的发送

发送消息代码示例:

// @title: NewMessage
// @description: 生成一个message消息
func NewMessage(value string) *sarama.ProducerMessage {
	msg := &sarama.ProducerMessage{}
	msg.Topic = topic
	msg.Value = sarama.StringEncoder(value)
	return msg
}

func testSendMsg() {
	// acks = NoResponse => offset = 0
	partition, offset, err := producer.SendMessage(NewMessage("test message"))
	if err != nil {
		panic(err)
	}
	fmt.Println(partition, offset)
}

发送消息需要初始化一个消息结构体,消息结构体的定义如下:

// 消息的数据结构
type ProducerMessage struct {
	Topic string
	Partition int32

	Key Encoder
	Value Encoder
	
	Headers []RecordHeader
	Metadata interface{}
	Timestamp time.Time
}
Topic:PartitionKeyValueStringEncoder(value)ByteEncoderTimestamp

生产者整体架构

架构模型图

在这里插入图片描述

拦截器

拦截器分为生产者拦截器和消费者拦截器。拦截器在消息经过序列化和分区之前进行,主要可以对消息做一些过滤或者修改消息内容。拦截器往往不是必需的。

官方拦截器主要实现三个方法:

type ProducerInterceptor interface {
	func onSend(*ProducerMessage)     // 在消息序列化和计算分区执行onSend()方法,对消息进行一些额外处理,如增加消息前缀、统计消息数量等
	func onAcknowledgement(*ProducerMessage) // 在消息被应答之前或者发送失败时调用onAcknowledgement()方法
	func close() // 主要用于在关闭拦截器时执行一些资源的清理工作
}
github.com/Shopify/sarama/interceptors.go
type ProducerInterceptor interface {
	OnSend(*ProducerMessage)
}

自定义拦截器示例

// 自定义一个拦截器结果体,实现OnSend()方法
type DefinedInterceptor struct{}

// @title: OnSend
// @descriptor: 在消息值中增加前缀pref-
func (di *DefinedInterceptor) OnSend(message *sarama.ProducerMessage) {
	value, _ := message.Value.(sarama.StringEncoder)
	message.Value = sarama.StringEncoder(fmt.Sprintf("pref-%s", string(value)))
}

// 将拦截器定义到生产者配置
func testInterceptor() {
	config.Producer.Interceptors = []sarama.ProducerInterceptor{&DefinedInterceptor{}}
}

func TestProducer() {
	// 测试拦截器,在发送消息前,将拦截器加在到生产者配置
	testInterceptor()
	testSendMsg()
}


==============消费结果=================
➜  bin kafka-console-consumer --bootstrap-server localhost:9092 --topic localTestTopic
pref-test message
pref-test message
pref-test message
pref-test message
pref-test message
pref-test message
pref-test message
pref-test message
pref-test message
pref-test message

序列化器

消息必须转换成字节数组才能在网络中传播,从客户端发送到kafka服务器,因此序列化是消息发送前的必经之路。

官方序列化器主要实现三个方法:

func configure()   // 配置当前类???不太明白
func serialize() []byte // 序列化操作
func close() // 关闭序列化器
Ecoder
type Encoder interface {
	Encode() ([]byte, error)
	Length() int
}

type StringEncoder string

func (s StringEncoder) Encode() ([]byte, error) {
	return []byte(s), nil
}

func (s StringEncoder) Length() int {
	return len(s)
}

同样golang还支持byteEncoder, 定义如下:

type ByteEncoder []byte

func (b ByteEncoder) Encode() ([]byte, error) {
	return b, nil
}

func (b ByteEncoder) Length() int {
	return len(b)
}

分区器

分区器的作用就是计算消息该发往哪个分区,注意,如果消息本身定义了partition字段指定分区,此时不需要分区器参与;其他情况都会经过分区器计算消息被发往的分区

官方定义中,分区器主要实现两个方法:

func partion() int // 计算消息被发往哪个分区,返回分区编号
func close()

golang的sarama包中,partitioner接口定义了分区器:

type Partitioner interface {
	
	Partition(message *ProducerMessage, numPartitions int32) (int32, error)
	RequiresConsistency() bool // sarama中增加这个方法,让接口定义key-> partition的映射关系是否不变
}

自定义分区器示例

// 自定义分区器
type DefinedPartitioner struct{}

// 极端自定义分区器,只发送1号分区
func (dp *DefinedPartitioner) Partition(message *sarama.ProducerMessage, numPartitions int32) (int32, error) {
	return 1, nil
}

func (dp *DefinedPartitioner) RequiresConsistency() bool {
	return false
}

func getPartitioner(topic string) sarama.Partitioner {
	return &DefinedPartitioner{}
}

// 将分区器装配到生产者配置
func testPartitioner() {
	config.Producer.Partitioner = getPartitioner
}

func TestProducer() {
	// 测试拦截器
	testInterceptor()
	// 测试分区器
	testPartitioner()
	testSendMsg()
}

=================生产结果======================
GOROOT=/usr/local/go #gosetup
GOPATH=/Users/liushi/go #gosetup
/usr/local/go/bin/go build -i -o /private/var/folders/zd/b2mys8_11x59vgvfvpt4snb40000gn/T/___go_build_demo . #gosetup
/private/var/folders/zd/b2mys8_11x59vgvfvpt4snb40000gn/T/___go_build_demo
1 23
1 24
1 25
1 26
1 27
1 28
1 29
1 30
1 31
1 32

Process finished with exit code 0

可以看到,生产的消息都发送到了1号分区

消息累加器

RecordAccumulator
buffer.memoryProducerBatchProducerBatch
ProducerBatchbatch.sizeProducerBatchProducerBatch

Sender线程

ProducerSender

InFlightRequest

InFlightRequestRequestmax.in.flight.requests.per.connection

ProducerBatch => Request => KafkaCluster

Sender(partition, ProducerBatch)
partitionnodebrokerProducerBatchRequestkafka clusterRequestInFlightRequestkafka clusterkafka clusterInFlightRequestProducerBatch

重要的生产者参数

acksmax.request.sizeretriesretry.backoff.mscompression.typeCPU/IOlinger.msProducerBatchProducerBatchlinger.mslinger.msrequest.timeout.msbuffer.memorymax.block.msbatch.sizeProducerBatch