https://github.com/Shopify/sarama 是一个纯go实现的kafka客户端,是gopher学习kafka一个很好的资料。说实话sarama的代码组织很烂,密密麻麻一堆源码文件都在一个目录,让人无从下手,下面列出了一部分:

examples
mocks
tools //基于客户端,实现的kafka客户端工具
    tools/kafka-producer-performance
    tools/kafka-console-producer
    tools/kafka-console-partitionconsumer
    tools/kafka-console-consumer
vagrant  //启动虚拟机的配置文件
acl_xx.go //权限相关的逻辑
add_partitions_to_txn_response.go
add_partitions_to_txn_request.go
add_offsets_to_txn_response.go
add_offsets_to_txn_request.go
admin.go
alter_xx.go  //修改相关的逻辑
async_producer.go
balance_strategy.go
broker.go
client.go
config.go
consumer_group.go
consumer_group_members.go
consumer.go
create_xx.go
fetch_request.go
delete_xx.go
describe_xx.go
list_xx.go
offset_xx.go
partitioner.go
sarama.go
sync_producer.go
produce_request.go
produce_response.go
utils.go

其实我们重点关注下面几个文件就好了

admin.go
async_producer.go
broker.go
client.go
consumer_group.go
consumer.go
sync_producer.go

还是从例子开始:

生产者

package main


import (
  "fmt"
  "log"


  "github.com/Shopify/sarama"
)


func main() {
  // 构建 生产者
  // 生成 生产者配置文件
  config := sarama.NewConfig()
  // 设置生产者 消息 回复等级 0 1 all
  config.Producer.RequiredAcks = sarama.NoResponse //sarama.WaitForAll
  //kafka server: Replication-factor is invalid.
  // 设置生产者 成功 发送消息 将在什么 通道返回
  config.Producer.Return.Successes = true
  // 设置生产者 发送的分区
  config.Producer.Partitioner = sarama.NewRandomPartitioner
  // 构建 消息
  msg := &sarama.ProducerMessage{}
  msg.Topic = "test"
  msg.Value = sarama.StringEncoder("123")


  // 连接 kafka
  producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config)
  if err != nil {
    log.Print(err)
    return
  }
  defer producer.Close()
  // 发送消息
  message, offset, err := producer.SendMessage(msg)
  if err != nil {
    log.Println(err)
    return
  }
  fmt.Println(message, " ", offset)
}

1,创建一个生产者:sarama.NewSyncProducer

代码在sync_producer.go中

func NewSyncProducer(addrs []string, config *Config) (SyncProducer, error) {
  if err := verifyProducerConfig(config); err != nil {
  }
  p, err := NewAsyncProducer(addrs, config)
  return newSyncProducerFromAsyncProducer(p.(*asyncProducer)), nil
}

先校验参数,然后调用NewAsyncProducer生成一个producer,将它转化成syncProducer

type syncProducer struct
 {
  producer *asyncProducer
  wg       sync.WaitGroup
}

可以看到syncProducer 本质上还是一个asyncProducer,通过waitGroup的方式来实现的同步。

NewAsyncProducer的代码实现在async_producer.go中:

func NewAsyncProducer(addrs []string, conf *Config) (AsyncProducer, error) {
  client, err := NewClient(addrs, conf)
  if err != nil {
    return nil, err
  }
  return newAsyncProducer(client)
}

首先创建了一个client,Client是对kafka broker连接的一个包装,生产者消费者都通过client和kafka broker进行通信的。代码位于client.go中

// Client is a generic Kafka client. It manages connections to one or more Kafka brokers.
// You MUST call Close() on a client to avoid leaks, it will not be garbage-collected
// automatically when it passes out of scope. It is safe to share a client amongst many
// users, however Kafka will process requests from a single client strictly in serial,
// so it is generally more efficient to use the default one client per producer/consumer.
type Client interface {
  // Config returns the Config struct of the client. This struct should not be
  // altered after it has been created.
  Config() *Config


  // Controller returns the cluster controller broker. It will return a
  // locally cached value if it's available. You can call RefreshController
  // to update the cached value. Requires Kafka 0.10 or higher.
  Controller() (*Broker, error)


  // RefreshController retrieves the cluster controller from fresh metadata
  // and stores it in the local cache. Requires Kafka 0.10 or higher.
  RefreshController() (*Broker, error)


  // Brokers returns the current set of active brokers as retrieved from cluster metadata.
  Brokers() []*Broker


  // Broker returns the active Broker if available for the broker ID.
  Broker(brokerID int32) (*Broker, error)


  // Topics returns the set of available topics as retrieved from cluster metadata.
  Topics() ([]string, error)


  // Partitions returns the sorted list of all partition IDs for the given topic.
  Partitions(topic string) ([]int32, error)


  // WritablePartitions returns the sorted list of all writable partition IDs for
  // the given topic, where "writable" means "having a valid leader accepting
  // writes".
  WritablePartitions(topic string) ([]int32, error)


  // Leader returns the broker object that is the leader of the current
  // topic/partition, as determined by querying the cluster metadata.
  Leader(topic string, partitionID int32) (*Broker, error)


  // Replicas returns the set of all replica IDs for the given partition.
  Replicas(topic string, partitionID int32) ([]int32, error)


  // InSyncReplicas returns the set of all in-sync replica IDs for the given
  // partition. In-sync replicas are replicas which are fully caught up with
  // the partition leader.
  InSyncReplicas(topic string, partitionID int32) ([]int32, error)


  // OfflineReplicas returns the set of all offline replica IDs for the given
  // partition. Offline replicas are replicas which are offline
  OfflineReplicas(topic string, partitionID int32) ([]int32, error)


  // RefreshBrokers takes a list of addresses to be used as seed brokers.
  // Existing broker connections are closed and the updated list of seed brokers
  // will be used for the next metadata fetch.
  RefreshBrokers(addrs []string) error


  // RefreshMetadata takes a list of topics and queries the cluster to refresh the
  // available metadata for those topics. If no topics are provided, it will refresh
  // metadata for all topics.
  RefreshMetadata(topics ...string) error


  // GetOffset queries the cluster to get the most recent available offset at the
  // given time (in milliseconds) on the topic/partition combination.
  // Time should be OffsetOldest for the earliest available offset,
  // OffsetNewest for the offset of the message that will be produced next, or a time.
  GetOffset(topic string, partitionID int32, time int64) (int64, error)


  // Coordinator returns the coordinating broker for a consumer group. It will
  // return a locally cached value if it's available. You can call
  // RefreshCoordinator to update the cached value. This function only works on
  // Kafka 0.8.2 and higher.
  Coordinator(consumerGroup string) (*Broker, error)


  // RefreshCoordinator retrieves the coordinator for a consumer group and stores it
  // in local cache. This function only works on Kafka 0.8.2 and higher.
  RefreshCoordinator(consumerGroup string) error


  // InitProducerID retrieves information required for Idempotent Producer
  InitProducerID() (*InitProducerIDResponse, error)


  // Close shuts down all broker connections managed by this client. It is required
  // to call this function before a client object passes out of scope, as it will
  // otherwise leak memory. You must close any Producers or Consumers using a client
  // before you close the client.
  Close() error


  // Closed returns true if the client has already had Close called on it
  Closed() bool
}

然后创建了一个asyncProducer对象

type asyncProducer struct {
  client Client
  conf   *Config


  errors                    chan *ProducerError
  input, successes, retries chan *ProducerMessage
  inFlight                  sync.WaitGroup


  brokers    map[*Broker]*brokerProducer
  brokerRefs map[*brokerProducer]int
  brokerLock sync.Mutex


  txnmgr *transactionManager
}

transactionManager是它的成员

type transactionManager struct {
  producerID      int64
  producerEpoch   int16
  sequenceNumbers map[string]int32
  mutex           sync.Mutex
}

创建完producer对象后起了两个协程

func newAsyncProducer(client Client) (AsyncProducer, error) {
....
  go withRecover(p.dispatcher)
  go withRecover(p.retryHandler)
}

重点关注下peoducer的input成员

input, successes, retries chan *ProducerMessage

dispatcher这个协程,不断消费input里面的消息,然后发送给topicProducer的input chanel,这样我们发送消息的时候,值需要不断往peoducer的input里面发送就可以了。

func (p *asyncProducer) dispatcher() {
    for msg := range p.input {
        handler = p.newTopicProducer(msg.Topic)
        handler <- msg
    }
}

这里面分两步:

1,获取topicProducer,返回topicProducer的input  chanel

2,向这个chanel里发送消息。

// one per topic
// partitions messages, then dispatches them by partition
type topicProducer struct {
  parent *asyncProducer
  topic  string
  input  <-chan *ProducerMessage


  breaker     *breaker.Breaker
  handlers    map[int32]chan<- *ProducerMessage
  partitioner Partitioner
}

每一个topicProducer同样会起一个协程

func (p *asyncProducer) newTopicProducer(topic string) chan<- *ProducerMessage 
   input := make(chan *ProducerMessage, p.conf.ChannelBufferSize)
   go withRecover(tp.dispatch)
}

dispatch 方法的内容很相似,把收到的消息转发给partitionProducer

func (tp *topicProducer) dispatch(){
     for msg := range tp.input {
          handler = tp.parent.newPartitionProducer(msg.Topic, msg.Partition)
          handler <- msg
     }
}

接着看下partitionProducer做了什么:

// one per partition per topic
// dispatches messages to the appropriate broker
// also responsible for maintaining message order during retries
type partitionProducer struct {
  parent    *asyncProducer
  topic     string
  partition int32
  input     <-chan *ProducerMessage


  leader         *Broker
  breaker        *breaker.Breaker
  brokerProducer *brokerProducer


  // highWatermark tracks the "current" retry level, which is the only one where we actually let messages through,
  // all other messages get buffered in retryState[msg.retries].buf to preserve ordering
  // retryState[msg.retries].expectChaser simply tracks whether we've seen a fin message for a given level (and
  // therefore whether our buffer is complete and safe to flush)
  highWatermark int
  retryState    []partitionRetryState
}
func (p *asyncProducer) newPartitionProducer(topic string, partition int32) chan<- *ProducerMessage {
   input := make(chan *ProducerMessage, p.conf.ChannelBufferSize)
   go withRecover(pp.dispatch)
     return input
}

没错,它同样起了个协程,返回了一个input channel用来接受消息,我们看看dispatch 的具体实现:

func (pp *partitionProducer) dispatch() {
   pp.leader, _ = pp.parent.client.Leader(pp.topic, pp.partition)
   pp.brokerProducer = pp.parent.getBrokerProducer(pp.leader)
   pp.brokerProducer.input <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: syn}
   for msg := range pp.input {
     pp.brokerProducer.input <- msg
   }
}

对,你没有看错,它同样创建了一个brokerProducer,然后把msg 发送到了brokerProducer的input channel。getBrokerProducer依赖于这个partitation的leader,发送消息都是发送到partation的leader,获取leader的方式是通过存储在kafka中的元数据得到的,后面会详细介绍。

func (p *asyncProducer) getBrokerProducer(broker *Broker) *brokerProducer {
   bp = p.newBrokerProducer(broker)
}
// groups messages together into appropriately-sized batches for sending to the broker
// handles state related to retries etc
type brokerProducer struct {
  parent *asyncProducer
  broker *Broker


  input     chan *ProducerMessage
  output    chan<- *produceSet
  responses <-chan *brokerProducerResponse
  abandoned chan struct{}
  stopchan  chan struct{}


  buffer     *produceSet
  timer      <-chan time.Time
  timerFired bool


  closing        error
  currentRetries map[string]map[int32]error
}

brokerProducer同样起了两个协程

func (p *asyncProducer) newBrokerProducer(broker *Broker) *brokerProducer {
      go withRecover(bp.run)
        // minimal bridge to make the network response `select`able
      go withRecover(func() {
        for set := range bridge {
          request := set.buildRequest()
    
          response, err := broker.Produce(request)
    
          responses <- &brokerProducerResponse{
            set: set,
            err: err,
            res: response,
          }
        }
        close(responses)
      })
}

run是一个循环,不断从input消费message,为请求kafka做准备。

func (bp *brokerProducer) run() {
      for {
         select {
            case msg, ok := <-bp.input:
                bp.buffer.add(msg)
            case output <- bp.buffer:
                 bp.rollOver()
            case response, ok := <-bp.responses:
                if ok {
                    bp.handleResponse(response)
               }
}

第二个协程,就是做了构建请求,发起请求,传递返回结果三件事情。

func (ps *produceSet) buildRequest() *ProduceRequest {
  for topic, partitionSets := range ps.msgs {
    for partition, set := range partitionSets {
         req.AddBatch(topic, partition, rb)
         }
    } 
    req.AddMessage(topic, partition, compMsg)
}

message 定义在async_producer.go中

type Message struct {
  Codec            CompressionCodec // codec used to compress the message contents
  CompressionLevel int              // compression level
  LogAppendTime    bool             // the used timestamp is LogAppendTime
  Key              []byte           // the message key, may be nil
  Value            []byte           // the message contents
  Set              *MessageSet      // the message set a message might wrap
  Version          int8             // v1 requires Kafka 0.10
  Timestamp        time.Time        // the timestamp of the message (version 1+ only)


  compressedCache []byte
  compressedSize  int // used for computing the compression ratio metrics
}

接着就是发送消息

func (b *Broker) Produce(request *ProduceRequest) (*ProduceResponse, error) {
    response = new(ProduceResponse)
    err = b.sendAndReceive(request, response)
}
func (b *Broker) sendAndReceive(req protocolBody, res protocolBody) error {
     promise, err := b.send(req, res != nil, responseHeaderVersion)
}
func (b *Broker) send(rb protocolBody, promiseResponse bool, responseHeaderVersion int16) (*responsePromise, error) {
      bytes, err := b.write(buf)
}

调用tcp连接发送数据

func (b *Broker) write(buf []byte) (n int, err error) {
   return b.conn.Write(buf)
}

上面就是整个数据的传递路径。

2, producer.SendMessage 发送消息

func (sp *syncProducer) SendMessage(msg *ProducerMessage) (partition int32, offset int64, err error) {
      sp.producer.Input() <- msg
}

SendMessage 就很简单了,往producer.Input的chanel里扔数据就好了。

上面就是sarama的生产消息流程,总结下,核心流程如下:

syncProducer->topicProducer->partitionProducer->brokerProducer

消息就是沿着这几个对象的input chanel 向下流动,最后通过tcp连接发送给kafka。