kafka sarama提供producer及consumer相关客户端建立及使用,本文主要研究sarama如何保证客户端使用时的长连接。
一:客户端初始化
NewClient()函数用于完成客户端初始化,更新metadata数据,供consumer和producer使用。其中metadata包含topic下所有partition,主partition信息及主partition所在broker信息。
NewClient()函数主要完成以下工作:
- 根据Config与addrs生成新的client结构体,结构体参数如下:
type client struct {
conf *Config
closer, closed chan none n
seedBrokers []*Broker
deadSeeds []*Broker
controllerID int32 // cluster controller broker id
brokers map[int32]*Broker // maps broker ids to brokers
metadata map[string]map[int32]*PartitionMetadata // maps topics to partition ids to metadata
metadataTopics map[string]none // topics that need to collect metadata
}
- 后台不停刷新kafka server集群的metadata信息,Metadata.RefreshFrequency默认为10分钟
- 若broker集群处于leader选举期间连不上,默认重试3次,每次等待250ms
- config中keepAlive字段,表示连接broker的保活间隔
二:cosumer客户端
consumer客户端主动探测kafka server信息以保证连接的有效性。
kafka 消费客户端源码解析(golang版)详细的介绍了consumer客户端如何消费kafka server中消息,其中ConsumerPartition函数中调用dispatch完成连接异常工作处理。
go dispatcher()函数,监听trigger,每次trigger代表一次消费连接异常,异常发生后,默认每2秒重连一次,直到重连成功
func (child *partitionConsumer) dispatcher() {
for range child.trigger {
select {
case <-child.dying:
close(child.trigger)
case <-time.After(child.conf.Consumer.Retry.Backoff):
if child.broker != nil {
child.consumer.unrefBrokerConsumer(child.broker)
child.broker = nil
}
Logger.Printf("consumer/%s/%d finding new broker\n", child.topic, child.partition)
if err := child.dispatch(); err != nil {
child.sendError(err)
child.trigger <- none{}
}
}
}
if child.broker != nil {
child.consumer.unrefBrokerConsumer(child.broker)
}
child.consumer.removeChild(child)
close(child.feeder)
}
三:producer客户端
对于producer,以异步消息发送为例,若成功则返回success,发送失败会进行发送重试,多次重试失败后,消息不再处理。
以异步发送NewAsyncProducerFromClient()函数为例,介绍producer消息发送过程。
1.消息传递过程中分别调用asyncProducer、topicProducer、partitionProducer、brokerProducer对象。其结构分别为:
type asyncProducer struct {
client Client
conf *Config
brokers map[*Broker]*brokerProducer
brokerRefs map[*brokerProducer]int
/***********省略************/
}
type topicProducer struct {
parent *asyncProducer
topic string
input <-chan *ProducerMessage
breaker *breaker.Breaker
handlers map[int32]chan<- *ProducerMessage
partitioner Partitioner
}
type partitionProducer struct {
parent *asyncProducer
topic string
partition int32
input <-chan *ProducerMessage
leader *Broker
breaker *breaker.Breaker
output chan<- *ProducerMessage
highWatermark int
retryState []partitionRetryState
}
type brokerProducer struct {
parent *asyncProducer
broker *Broker
input <-chan *ProducerMessage
output chan<- *produceSet
responses <-chan *brokerProducerResponse
buffer *produceSet
}
由以上结构体可知,topicProducer、partitionProducer、brokerProducer的parent都是asyncProducer。
2.对象调用的过程实际是根据topic把生产的消息异步分派到各个handler。具体的消息传递过程为asyncProducer.dispatcher() -> topicProducer.dispatch() -> partitionProducer.dispatch() -> brokerProducer() -> produceSet()。
最终消息由broker.Producer发送,broker为partitionProducer依据客户端metadata选择的leader broker,表实际kafka连接。发送失败默认3次重试。
func (p *asyncProducer) dispatcher() {
//topicProducer创建
handler = p.newTopicProducer(msg.Topic)
}
func (tp *topicProducer) dispatch() {
//partitionProducer创建
handler = tp.parent.newPartitionProducer()
}
func (pp *partitionProducer) dispatch() {
//brokerProducer创建
pp.output = pp.parent.getBrokerProducer(pp.leader)
}
func (bp *brokerProducer) run() {
//produceSet channel建立
var output chan<- *produceSet
}
//Broker.send()函数发送具体的request请求
func (b *Broker) send(rb protocolBody, promiseResponse bool) (*responsePromise, error) {
buf, err := encode(req, b.conf.MetricRegistry)
bytes, err := b.conn.Write(buf)
return &promise, nil
}
3.连接关闭在sender.Close()函数中发生,同时返回msg发送失败的error信息
func (p *asyncProducer) Close() error {
p.AsyncClose()
if p.conf.Producer.Return.Successes {
go withRecover(func() {
for range p.successes {
}
})
}
var errors ProducerErrors
if p.conf.Producer.Return.Errors {
for event := range p.errors {
errors = append(errors, event)
}
} else {
<-p.errors
}
if len(errors) > 0 {
return errors
}
return nil
}
总结:
- client负责定期刷新metadata数据,供consumer与producer连接使用。
- 对于consumer:sarama提供不停重连的机制,能保证kafka server宕机重启,网络波动,网络断开重连等场景下保持正常消费。
- 对于producer:在异步生产消息的时候,若成功则返回success,若3次重试后仍然失败,则从通道中返回失败,并且该条消息不会再处理。
文章同步公众号,欢迎扫码关注