kafka sarama提供producer及consumer相关客户端建立及使用,本文主要研究sarama如何保证客户端使用时的长连接。
一:客户端初始化
NewClient()函数用于完成客户端初始化,更新metadata数据,供consumer和producer使用。其中metadata包含topic下所有partition,主partition信息及主partition所在broker信息。
NewClient()函数主要完成以下工作:

  • 根据Config与addrs生成新的client结构体,结构体参数如下:

type client struct {
conf *Config
closer, closed chan none n 
seedBrokers []*Broker
deadSeeds []*Broker
 
controllerID int32 // cluster controller broker id
brokers map[int32]*Broker // maps broker ids to brokers
metadata map[string]map[int32]*PartitionMetadata // maps topics to partition ids to metadata
metadataTopics map[string]none // topics that need to collect metadata
}
  • 后台不停刷新kafka server集群的metadata信息,Metadata.RefreshFrequency默认为10分钟
  • 若broker集群处于leader选举期间连不上,默认重试3次,每次等待250ms
  • config中keepAlive字段,表示连接broker的保活间隔

二:cosumer客户端
consumer客户端主动探测kafka server信息以保证连接的有效性。
kafka 消费客户端源码解析(golang版)详细的介绍了consumer客户端如何消费kafka server中消息,其中ConsumerPartition函数中调用dispatch完成连接异常工作处理。
go dispatcher()函数,监听trigger,每次trigger代表一次消费连接异常,异常发生后,默认每2秒重连一次,直到重连成功

func (child *partitionConsumer) dispatcher() {
   for range child.trigger {
      select {
      case <-child.dying:
         close(child.trigger)
      case <-time.After(child.conf.Consumer.Retry.Backoff):
         if child.broker != nil {
            child.consumer.unrefBrokerConsumer(child.broker)
            child.broker = nil
         }
 
         Logger.Printf("consumer/%s/%d finding new broker\n", child.topic, child.partition)
         if err := child.dispatch(); err != nil {
            child.sendError(err)
            child.trigger <- none{}
         }
      }
   }
 
   if child.broker != nil {
      child.consumer.unrefBrokerConsumer(child.broker)
   }
   child.consumer.removeChild(child)
   close(child.feeder)
}

三:producer客户端
对于producer,以异步消息发送为例,若成功则返回success,发送失败会进行发送重试,多次重试失败后,消息不再处理。
以异步发送NewAsyncProducerFromClient()函数为例,介绍producer消息发送过程。
1.消息传递过程中分别调用asyncProducer、topicProducer、partitionProducer、brokerProducer对象。其结构分别为:

type asyncProducer struct {
   client    Client
   conf      *Config
   brokers    map[*Broker]*brokerProducer
   brokerRefs map[*brokerProducer]int
   /***********省略************/
}

type topicProducer struct {
	parent *asyncProducer
	topic  string
	input  <-chan *ProducerMessage

	breaker     *breaker.Breaker
	handlers    map[int32]chan<- *ProducerMessage
	partitioner Partitioner
}

type partitionProducer struct {
	parent    *asyncProducer
	topic     string
	partition int32
	input     <-chan *ProducerMessage

	leader  *Broker
	breaker *breaker.Breaker
	output  chan<- *ProducerMessage
	highWatermark int
	retryState    []partitionRetryState
}

type brokerProducer struct {
	parent *asyncProducer
	broker *Broker

	input     <-chan *ProducerMessage
	output    chan<- *produceSet
	responses <-chan *brokerProducerResponse

	buffer     *produceSet
}

由以上结构体可知,topicProducer、partitionProducer、brokerProducer的parent都是asyncProducer。

2.对象调用的过程实际是根据topic把生产的消息异步分派到各个handler。具体的消息传递过程为asyncProducer.dispatcher() -> topicProducer.dispatch() -> partitionProducer.dispatch() -> brokerProducer() -> produceSet()。
最终消息由broker.Producer发送,broker为partitionProducer依据客户端metadata选择的leader broker,表实际kafka连接。发送失败默认3次重试。

func (p *asyncProducer) dispatcher() {
//topicProducer创建
 handler = p.newTopicProducer(msg.Topic)
}

func (tp *topicProducer) dispatch() {
//partitionProducer创建
handler = tp.parent.newPartitionProducer()
}

func (pp *partitionProducer) dispatch() {
//brokerProducer创建
pp.output = pp.parent.getBrokerProducer(pp.leader)
}

func (bp *brokerProducer) run() {
//produceSet channel建立
	var output chan<- *produceSet
}
//Broker.send()函数发送具体的request请求
func (b *Broker) send(rb protocolBody, promiseResponse bool) (*responsePromise, error) {
    buf, err := encode(req, b.conf.MetricRegistry)
	bytes, err := b.conn.Write(buf)
	return &promise, nil
}

3.连接关闭在sender.Close()函数中发生,同时返回msg发送失败的error信息

func (p *asyncProducer) Close() error {
   p.AsyncClose()
 
   if p.conf.Producer.Return.Successes {
      go withRecover(func() {
         for range p.successes {
         }
      })
   }
 
   var errors ProducerErrors
   if p.conf.Producer.Return.Errors {
      for event := range p.errors {
         errors = append(errors, event)
      }
   } else {
      <-p.errors
   }
 
   if len(errors) > 0 {
      return errors
   }
   return nil
}

总结:

  • client负责定期刷新metadata数据,供consumer与producer连接使用。
  • 对于consumer:sarama提供不停重连的机制,能保证kafka server宕机重启,网络波动,网络断开重连等场景下保持正常消费。
  • 对于producer:在异步生产消息的时候,若成功则返回success,若3次重试后仍然失败,则从通道中返回失败,并且该条消息不会再处理。

文章同步公众号,欢迎扫码关注
在这里插入图片描述