https://github.com/Shopify/sarama 是一个纯go实现的kafka客户端,是gopher学习kafka一个很好的资料。说实话sarama的代码组织很烂,密密麻麻一堆源码文件都在一个目录,让人无从下手,下面列出了一部分:
examplesmockstools //基于客户端,实现的kafka客户端工具tools/kafka-producer-performancetools/kafka-console-producertools/kafka-console-partitionconsumertools/kafka-console-consumervagrant //启动虚拟机的配置文件acl_xx.go //权限相关的逻辑add_partitions_to_txn_response.goadd_partitions_to_txn_request.goadd_offsets_to_txn_response.goadd_offsets_to_txn_request.goadmin.goalter_xx.go //修改相关的逻辑async_producer.gobalance_strategy.gobroker.goclient.goconfig.goconsumer_group.goconsumer_group_members.goconsumer.gocreate_xx.gofetch_request.godelete_xx.godescribe_xx.golist_xx.gooffset_xx.gopartitioner.gosarama.gosync_producer.goproduce_request.goproduce_response.goutils.go
其实我们重点关注下面几个文件就好了
admin.goasync_producer.gobroker.goclient.goconsumer_group.goconsumer.gosync_producer.go
还是从例子开始:
生产者
package mainimport ("fmt""log""github.com/Shopify/sarama")func main() {// 构建 生产者// 生成 生产者配置文件config := sarama.NewConfig()// 设置生产者 消息 回复等级 0 1 allconfig.Producer.RequiredAcks = sarama.NoResponse //sarama.WaitForAll//kafka server: Replication-factor is invalid.// 设置生产者 成功 发送消息 将在什么 通道返回config.Producer.Return.Successes = true// 设置生产者 发送的分区config.Producer.Partitioner = sarama.NewRandomPartitioner// 构建 消息msg := &sarama.ProducerMessage{}msg.Topic = "test"msg.Value = sarama.StringEncoder("123")// 连接 kafkaproducer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config)if err != nil {log.Print(err)return}defer producer.Close()// 发送消息message, offset, err := producer.SendMessage(msg)if err != nil {log.Println(err)return}fmt.Println(message, " ", offset)}
1,创建一个生产者:sarama.NewSyncProducer
代码在sync_producer.go中
func NewSyncProducer(addrs []string, config *Config) (SyncProducer, error) {if err := verifyProducerConfig(config); err != nil {}p, err := NewAsyncProducer(addrs, config)return newSyncProducerFromAsyncProducer(p.(*asyncProducer)), nil}
先校验参数,然后调用NewAsyncProducer生成一个producer,将它转化成syncProducer
type syncProducer struct{producer *asyncProducerwg sync.WaitGroup}
可以看到syncProducer 本质上还是一个asyncProducer,通过waitGroup的方式来实现的同步。
NewAsyncProducer的代码实现在async_producer.go中:
func NewAsyncProducer(addrs []string, conf *Config) (AsyncProducer, error) {client, err := NewClient(addrs, conf)if err != nil {return nil, err}return newAsyncProducer(client)}
首先创建了一个client,Client是对kafka broker连接的一个包装,生产者消费者都通过client和kafka broker进行通信的。代码位于client.go中
// Client is a generic Kafka client. It manages connections to one or more Kafka brokers.// You MUST call Close() on a client to avoid leaks, it will not be garbage-collected// automatically when it passes out of scope. It is safe to share a client amongst many// users, however Kafka will process requests from a single client strictly in serial,// so it is generally more efficient to use the default one client per producer/consumer.type Client interface {// Config returns the Config struct of the client. This struct should not be// altered after it has been created.Config() *Config// Controller returns the cluster controller broker. It will return a// locally cached value if it's available. You can call RefreshController// to update the cached value. Requires Kafka 0.10 or higher.Controller() (*Broker, error)// RefreshController retrieves the cluster controller from fresh metadata// and stores it in the local cache. Requires Kafka 0.10 or higher.RefreshController() (*Broker, error)// Brokers returns the current set of active brokers as retrieved from cluster metadata.Brokers() []*Broker// Broker returns the active Broker if available for the broker ID.Broker(brokerID int32) (*Broker, error)// Topics returns the set of available topics as retrieved from cluster metadata.Topics() ([]string, error)// Partitions returns the sorted list of all partition IDs for the given topic.Partitions(topic string) ([]int32, error)// WritablePartitions returns the sorted list of all writable partition IDs for// the given topic, where "writable" means "having a valid leader accepting// writes".WritablePartitions(topic string) ([]int32, error)// Leader returns the broker object that is the leader of the current// topic/partition, as determined by querying the cluster metadata.Leader(topic string, partitionID int32) (*Broker, error)// Replicas returns the set of all replica IDs for the given partition.Replicas(topic string, partitionID int32) ([]int32, error)// InSyncReplicas returns the set of all in-sync replica IDs for the given// partition. In-sync replicas are replicas which are fully caught up with// the partition leader.InSyncReplicas(topic string, partitionID int32) ([]int32, error)// OfflineReplicas returns the set of all offline replica IDs for the given// partition. Offline replicas are replicas which are offlineOfflineReplicas(topic string, partitionID int32) ([]int32, error)// RefreshBrokers takes a list of addresses to be used as seed brokers.// Existing broker connections are closed and the updated list of seed brokers// will be used for the next metadata fetch.RefreshBrokers(addrs []string) error// RefreshMetadata takes a list of topics and queries the cluster to refresh the// available metadata for those topics. If no topics are provided, it will refresh// metadata for all topics.RefreshMetadata(topics ...string) error// GetOffset queries the cluster to get the most recent available offset at the// given time (in milliseconds) on the topic/partition combination.// Time should be OffsetOldest for the earliest available offset,// OffsetNewest for the offset of the message that will be produced next, or a time.GetOffset(topic string, partitionID int32, time int64) (int64, error)// Coordinator returns the coordinating broker for a consumer group. It will// return a locally cached value if it's available. You can call// RefreshCoordinator to update the cached value. This function only works on// Kafka 0.8.2 and higher.Coordinator(consumerGroup string) (*Broker, error)// RefreshCoordinator retrieves the coordinator for a consumer group and stores it// in local cache. This function only works on Kafka 0.8.2 and higher.RefreshCoordinator(consumerGroup string) error// InitProducerID retrieves information required for Idempotent ProducerInitProducerID() (*InitProducerIDResponse, error)// Close shuts down all broker connections managed by this client. It is required// to call this function before a client object passes out of scope, as it will// otherwise leak memory. You must close any Producers or Consumers using a client// before you close the client.Close() error// Closed returns true if the client has already had Close called on itClosed() bool}
然后创建了一个asyncProducer对象
type asyncProducer struct {client Clientconf *Configerrors chan *ProducerErrorinput, successes, retries chan *ProducerMessageinFlight sync.WaitGroupbrokers map[*Broker]*brokerProducerbrokerRefs map[*brokerProducer]intbrokerLock sync.Mutextxnmgr *transactionManager}
transactionManager是它的成员
type transactionManager struct {producerID int64producerEpoch int16sequenceNumbers map[string]int32mutex sync.Mutex}
创建完producer对象后起了两个协程
func newAsyncProducer(client Client) (AsyncProducer, error) {....go withRecover(p.dispatcher)go withRecover(p.retryHandler)}
重点关注下peoducer的input成员
input, successes, retries chan *ProducerMessage
dispatcher这个协程,不断消费input里面的消息,然后发送给topicProducer的input chanel,这样我们发送消息的时候,值需要不断往peoducer的input里面发送就可以了。
func (p *asyncProducer) dispatcher() {for msg := range p.input {handler = p.newTopicProducer(msg.Topic)handler <- msg}}
这里面分两步:
1,获取topicProducer,返回topicProducer的input chanel
2,向这个chanel里发送消息。
// one per topic// partitions messages, then dispatches them by partitiontype topicProducer struct {parent *asyncProducertopic stringinput <-chan *ProducerMessagebreaker *breaker.Breakerhandlers map[int32]chan<- *ProducerMessagepartitioner Partitioner}
每一个topicProducer同样会起一个协程
func (p *asyncProducer) newTopicProducer(topic string) chan<- *ProducerMessageinput := make(chan *ProducerMessage, p.conf.ChannelBufferSize)go withRecover(tp.dispatch)}
dispatch 方法的内容很相似,把收到的消息转发给partitionProducer
func (tp *topicProducer) dispatch(){for msg := range tp.input {handler = tp.parent.newPartitionProducer(msg.Topic, msg.Partition)handler <- msg}}
接着看下partitionProducer做了什么:
// one per partition per topic// dispatches messages to the appropriate broker// also responsible for maintaining message order during retriestype partitionProducer struct {parent *asyncProducertopic stringpartition int32input <-chan *ProducerMessageleader *Brokerbreaker *breaker.BreakerbrokerProducer *brokerProducer// highWatermark tracks the "current" retry level, which is the only one where we actually let messages through,// all other messages get buffered in retryState[msg.retries].buf to preserve ordering// retryState[msg.retries].expectChaser simply tracks whether we've seen a fin message for a given level (and// therefore whether our buffer is complete and safe to flush)highWatermark intretryState []partitionRetryState}
func (p *asyncProducer) newPartitionProducer(topic string, partition int32) chan<- *ProducerMessage {input := make(chan *ProducerMessage, p.conf.ChannelBufferSize)go withRecover(pp.dispatch)return input}
没错,它同样起了个协程,返回了一个input channel用来接受消息,我们看看dispatch 的具体实现:
func (pp *partitionProducer) dispatch() {pp.leader, _ = pp.parent.client.Leader(pp.topic, pp.partition)pp.brokerProducer = pp.parent.getBrokerProducer(pp.leader)pp.brokerProducer.input <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: syn}for msg := range pp.input {pp.brokerProducer.input <- msg}}
对,你没有看错,它同样创建了一个brokerProducer,然后把msg 发送到了brokerProducer的input channel。getBrokerProducer依赖于这个partitation的leader,发送消息都是发送到partation的leader,获取leader的方式是通过存储在kafka中的元数据得到的,后面会详细介绍。
func (p *asyncProducer) getBrokerProducer(broker *Broker) *brokerProducer {bp = p.newBrokerProducer(broker)}
// groups messages together into appropriately-sized batches for sending to the broker// handles state related to retries etctype brokerProducer struct {parent *asyncProducerbroker *Brokerinput chan *ProducerMessageoutput chan<- *produceSetresponses <-chan *brokerProducerResponseabandoned chan struct{}stopchan chan struct{}buffer *produceSettimer <-chan time.TimetimerFired boolclosing errorcurrentRetries map[string]map[int32]error}
brokerProducer同样起了两个协程
func (p *asyncProducer) newBrokerProducer(broker *Broker) *brokerProducer {go withRecover(bp.run)// minimal bridge to make the network response `select`ablego withRecover(func() {for set := range bridge {request := set.buildRequest()response, err := broker.Produce(request)responses <- &brokerProducerResponse{set: set,err: err,res: response,}}close(responses)})}
run是一个循环,不断从input消费message,为请求kafka做准备。
func (bp *brokerProducer) run() {for {select {case msg, ok := <-bp.input:bp.buffer.add(msg)case output <- bp.buffer:bp.rollOver()case response, ok := <-bp.responses:if ok {bp.handleResponse(response)}}
第二个协程,就是做了构建请求,发起请求,传递返回结果三件事情。
func (ps *produceSet) buildRequest() *ProduceRequest {for topic, partitionSets := range ps.msgs {for partition, set := range partitionSets {req.AddBatch(topic, partition, rb)}}req.AddMessage(topic, partition, compMsg)}
message 定义在async_producer.go中
type Message struct {Codec CompressionCodec // codec used to compress the message contentsCompressionLevel int // compression levelLogAppendTime bool // the used timestamp is LogAppendTimeKey []byte // the message key, may be nilValue []byte // the message contentsSet *MessageSet // the message set a message might wrapVersion int8 // v1 requires Kafka 0.10Timestamp time.Time // the timestamp of the message (version 1+ only)compressedCache []bytecompressedSize int // used for computing the compression ratio metrics}
接着就是发送消息
func (b *Broker) Produce(request *ProduceRequest) (*ProduceResponse, error) {response = new(ProduceResponse)err = b.sendAndReceive(request, response)}
func (b *Broker) sendAndReceive(req protocolBody, res protocolBody) error {promise, err := b.send(req, res != nil, responseHeaderVersion)}
func (b *Broker) send(rb protocolBody, promiseResponse bool, responseHeaderVersion int16) (*responsePromise, error) {bytes, err := b.write(buf)}
调用tcp连接发送数据
func (b *Broker) write(buf []byte) (n int, err error) {return b.conn.Write(buf)}
上面就是整个数据的传递路径。
2, producer.SendMessage 发送消息
func (sp *syncProducer) SendMessage(msg *ProducerMessage) (partition int32, offset int64, err error) {sp.producer.Input() <- msg}
SendMessage 就很简单了,往producer.Input的chanel里扔数据就好了。
上面就是sarama的生产消息流程,总结下,核心流程如下:
syncProducer->topicProducer->partitionProducer->brokerProducer
消息就是沿着这几个对象的input chanel 向下流动,最后通过tcp连接发送给kafka。