https://github.com/Shopify/sarama 是一个纯go实现的kafka客户端,是gopher学习kafka一个很好的资料。说实话sarama的代码组织很烂,密密麻麻一堆源码文件都在一个目录,让人无从下手,下面列出了一部分:
examples
mocks
tools //基于客户端,实现的kafka客户端工具
tools/kafka-producer-performance
tools/kafka-console-producer
tools/kafka-console-partitionconsumer
tools/kafka-console-consumer
vagrant //启动虚拟机的配置文件
acl_xx.go //权限相关的逻辑
add_partitions_to_txn_response.go
add_partitions_to_txn_request.go
add_offsets_to_txn_response.go
add_offsets_to_txn_request.go
admin.go
alter_xx.go //修改相关的逻辑
async_producer.go
balance_strategy.go
broker.go
client.go
config.go
consumer_group.go
consumer_group_members.go
consumer.go
create_xx.go
fetch_request.go
delete_xx.go
describe_xx.go
list_xx.go
offset_xx.go
partitioner.go
sarama.go
sync_producer.go
produce_request.go
produce_response.go
utils.go
其实我们重点关注下面几个文件就好了
admin.go
async_producer.go
broker.go
client.go
consumer_group.go
consumer.go
sync_producer.go
还是从例子开始:
生产者
package main
import (
"fmt"
"log"
"github.com/Shopify/sarama"
)
func main() {
// 构建 生产者
// 生成 生产者配置文件
config := sarama.NewConfig()
// 设置生产者 消息 回复等级 0 1 all
config.Producer.RequiredAcks = sarama.NoResponse //sarama.WaitForAll
//kafka server: Replication-factor is invalid.
// 设置生产者 成功 发送消息 将在什么 通道返回
config.Producer.Return.Successes = true
// 设置生产者 发送的分区
config.Producer.Partitioner = sarama.NewRandomPartitioner
// 构建 消息
msg := &sarama.ProducerMessage{}
msg.Topic = "test"
msg.Value = sarama.StringEncoder("123")
// 连接 kafka
producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config)
if err != nil {
log.Print(err)
return
}
defer producer.Close()
// 发送消息
message, offset, err := producer.SendMessage(msg)
if err != nil {
log.Println(err)
return
}
fmt.Println(message, " ", offset)
}
1,创建一个生产者:sarama.NewSyncProducer
代码在sync_producer.go中
func NewSyncProducer(addrs []string, config *Config) (SyncProducer, error) {
if err := verifyProducerConfig(config); err != nil {
}
p, err := NewAsyncProducer(addrs, config)
return newSyncProducerFromAsyncProducer(p.(*asyncProducer)), nil
}
先校验参数,然后调用NewAsyncProducer生成一个producer,将它转化成syncProducer
type syncProducer struct
{
producer *asyncProducer
wg sync.WaitGroup
}
可以看到syncProducer 本质上还是一个asyncProducer,通过waitGroup的方式来实现的同步。
NewAsyncProducer的代码实现在async_producer.go中:
func NewAsyncProducer(addrs []string, conf *Config) (AsyncProducer, error) {
client, err := NewClient(addrs, conf)
if err != nil {
return nil, err
}
return newAsyncProducer(client)
}
首先创建了一个client,Client是对kafka broker连接的一个包装,生产者消费者都通过client和kafka broker进行通信的。代码位于client.go中
// Client is a generic Kafka client. It manages connections to one or more Kafka brokers.
// You MUST call Close() on a client to avoid leaks, it will not be garbage-collected
// automatically when it passes out of scope. It is safe to share a client amongst many
// users, however Kafka will process requests from a single client strictly in serial,
// so it is generally more efficient to use the default one client per producer/consumer.
type Client interface {
// Config returns the Config struct of the client. This struct should not be
// altered after it has been created.
Config() *Config
// Controller returns the cluster controller broker. It will return a
// locally cached value if it's available. You can call RefreshController
// to update the cached value. Requires Kafka 0.10 or higher.
Controller() (*Broker, error)
// RefreshController retrieves the cluster controller from fresh metadata
// and stores it in the local cache. Requires Kafka 0.10 or higher.
RefreshController() (*Broker, error)
// Brokers returns the current set of active brokers as retrieved from cluster metadata.
Brokers() []*Broker
// Broker returns the active Broker if available for the broker ID.
Broker(brokerID int32) (*Broker, error)
// Topics returns the set of available topics as retrieved from cluster metadata.
Topics() ([]string, error)
// Partitions returns the sorted list of all partition IDs for the given topic.
Partitions(topic string) ([]int32, error)
// WritablePartitions returns the sorted list of all writable partition IDs for
// the given topic, where "writable" means "having a valid leader accepting
// writes".
WritablePartitions(topic string) ([]int32, error)
// Leader returns the broker object that is the leader of the current
// topic/partition, as determined by querying the cluster metadata.
Leader(topic string, partitionID int32) (*Broker, error)
// Replicas returns the set of all replica IDs for the given partition.
Replicas(topic string, partitionID int32) ([]int32, error)
// InSyncReplicas returns the set of all in-sync replica IDs for the given
// partition. In-sync replicas are replicas which are fully caught up with
// the partition leader.
InSyncReplicas(topic string, partitionID int32) ([]int32, error)
// OfflineReplicas returns the set of all offline replica IDs for the given
// partition. Offline replicas are replicas which are offline
OfflineReplicas(topic string, partitionID int32) ([]int32, error)
// RefreshBrokers takes a list of addresses to be used as seed brokers.
// Existing broker connections are closed and the updated list of seed brokers
// will be used for the next metadata fetch.
RefreshBrokers(addrs []string) error
// RefreshMetadata takes a list of topics and queries the cluster to refresh the
// available metadata for those topics. If no topics are provided, it will refresh
// metadata for all topics.
RefreshMetadata(topics ...string) error
// GetOffset queries the cluster to get the most recent available offset at the
// given time (in milliseconds) on the topic/partition combination.
// Time should be OffsetOldest for the earliest available offset,
// OffsetNewest for the offset of the message that will be produced next, or a time.
GetOffset(topic string, partitionID int32, time int64) (int64, error)
// Coordinator returns the coordinating broker for a consumer group. It will
// return a locally cached value if it's available. You can call
// RefreshCoordinator to update the cached value. This function only works on
// Kafka 0.8.2 and higher.
Coordinator(consumerGroup string) (*Broker, error)
// RefreshCoordinator retrieves the coordinator for a consumer group and stores it
// in local cache. This function only works on Kafka 0.8.2 and higher.
RefreshCoordinator(consumerGroup string) error
// InitProducerID retrieves information required for Idempotent Producer
InitProducerID() (*InitProducerIDResponse, error)
// Close shuts down all broker connections managed by this client. It is required
// to call this function before a client object passes out of scope, as it will
// otherwise leak memory. You must close any Producers or Consumers using a client
// before you close the client.
Close() error
// Closed returns true if the client has already had Close called on it
Closed() bool
}
然后创建了一个asyncProducer对象
type asyncProducer struct {
client Client
conf *Config
errors chan *ProducerError
input, successes, retries chan *ProducerMessage
inFlight sync.WaitGroup
brokers map[*Broker]*brokerProducer
brokerRefs map[*brokerProducer]int
brokerLock sync.Mutex
txnmgr *transactionManager
}
transactionManager是它的成员
type transactionManager struct {
producerID int64
producerEpoch int16
sequenceNumbers map[string]int32
mutex sync.Mutex
}
创建完producer对象后起了两个协程
func newAsyncProducer(client Client) (AsyncProducer, error) {
....
go withRecover(p.dispatcher)
go withRecover(p.retryHandler)
}
重点关注下peoducer的input成员
input, successes, retries chan *ProducerMessage
dispatcher这个协程,不断消费input里面的消息,然后发送给topicProducer的input chanel,这样我们发送消息的时候,值需要不断往peoducer的input里面发送就可以了。
func (p *asyncProducer) dispatcher() {
for msg := range p.input {
handler = p.newTopicProducer(msg.Topic)
handler <- msg
}
}
这里面分两步:
1,获取topicProducer,返回topicProducer的input chanel
2,向这个chanel里发送消息。
// one per topic
// partitions messages, then dispatches them by partition
type topicProducer struct {
parent *asyncProducer
topic string
input <-chan *ProducerMessage
breaker *breaker.Breaker
handlers map[int32]chan<- *ProducerMessage
partitioner Partitioner
}
每一个topicProducer同样会起一个协程
func (p *asyncProducer) newTopicProducer(topic string) chan<- *ProducerMessage
input := make(chan *ProducerMessage, p.conf.ChannelBufferSize)
go withRecover(tp.dispatch)
}
dispatch 方法的内容很相似,把收到的消息转发给partitionProducer
func (tp *topicProducer) dispatch(){
for msg := range tp.input {
handler = tp.parent.newPartitionProducer(msg.Topic, msg.Partition)
handler <- msg
}
}
接着看下partitionProducer做了什么:
// one per partition per topic
// dispatches messages to the appropriate broker
// also responsible for maintaining message order during retries
type partitionProducer struct {
parent *asyncProducer
topic string
partition int32
input <-chan *ProducerMessage
leader *Broker
breaker *breaker.Breaker
brokerProducer *brokerProducer
// highWatermark tracks the "current" retry level, which is the only one where we actually let messages through,
// all other messages get buffered in retryState[msg.retries].buf to preserve ordering
// retryState[msg.retries].expectChaser simply tracks whether we've seen a fin message for a given level (and
// therefore whether our buffer is complete and safe to flush)
highWatermark int
retryState []partitionRetryState
}
func (p *asyncProducer) newPartitionProducer(topic string, partition int32) chan<- *ProducerMessage {
input := make(chan *ProducerMessage, p.conf.ChannelBufferSize)
go withRecover(pp.dispatch)
return input
}
没错,它同样起了个协程,返回了一个input channel用来接受消息,我们看看dispatch 的具体实现:
func (pp *partitionProducer) dispatch() {
pp.leader, _ = pp.parent.client.Leader(pp.topic, pp.partition)
pp.brokerProducer = pp.parent.getBrokerProducer(pp.leader)
pp.brokerProducer.input <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: syn}
for msg := range pp.input {
pp.brokerProducer.input <- msg
}
}
对,你没有看错,它同样创建了一个brokerProducer,然后把msg 发送到了brokerProducer的input channel。getBrokerProducer依赖于这个partitation的leader,发送消息都是发送到partation的leader,获取leader的方式是通过存储在kafka中的元数据得到的,后面会详细介绍。
func (p *asyncProducer) getBrokerProducer(broker *Broker) *brokerProducer {
bp = p.newBrokerProducer(broker)
}
// groups messages together into appropriately-sized batches for sending to the broker
// handles state related to retries etc
type brokerProducer struct {
parent *asyncProducer
broker *Broker
input chan *ProducerMessage
output chan<- *produceSet
responses <-chan *brokerProducerResponse
abandoned chan struct{}
stopchan chan struct{}
buffer *produceSet
timer <-chan time.Time
timerFired bool
closing error
currentRetries map[string]map[int32]error
}
brokerProducer同样起了两个协程
func (p *asyncProducer) newBrokerProducer(broker *Broker) *brokerProducer {
go withRecover(bp.run)
// minimal bridge to make the network response `select`able
go withRecover(func() {
for set := range bridge {
request := set.buildRequest()
response, err := broker.Produce(request)
responses <- &brokerProducerResponse{
set: set,
err: err,
res: response,
}
}
close(responses)
})
}
run是一个循环,不断从input消费message,为请求kafka做准备。
func (bp *brokerProducer) run() {
for {
select {
case msg, ok := <-bp.input:
bp.buffer.add(msg)
case output <- bp.buffer:
bp.rollOver()
case response, ok := <-bp.responses:
if ok {
bp.handleResponse(response)
}
}
第二个协程,就是做了构建请求,发起请求,传递返回结果三件事情。
func (ps *produceSet) buildRequest() *ProduceRequest {
for topic, partitionSets := range ps.msgs {
for partition, set := range partitionSets {
req.AddBatch(topic, partition, rb)
}
}
req.AddMessage(topic, partition, compMsg)
}
message 定义在async_producer.go中
type Message struct {
Codec CompressionCodec // codec used to compress the message contents
CompressionLevel int // compression level
LogAppendTime bool // the used timestamp is LogAppendTime
Key []byte // the message key, may be nil
Value []byte // the message contents
Set *MessageSet // the message set a message might wrap
Version int8 // v1 requires Kafka 0.10
Timestamp time.Time // the timestamp of the message (version 1+ only)
compressedCache []byte
compressedSize int // used for computing the compression ratio metrics
}
接着就是发送消息
func (b *Broker) Produce(request *ProduceRequest) (*ProduceResponse, error) {
response = new(ProduceResponse)
err = b.sendAndReceive(request, response)
}
func (b *Broker) sendAndReceive(req protocolBody, res protocolBody) error {
promise, err := b.send(req, res != nil, responseHeaderVersion)
}
func (b *Broker) send(rb protocolBody, promiseResponse bool, responseHeaderVersion int16) (*responsePromise, error) {
bytes, err := b.write(buf)
}
调用tcp连接发送数据
func (b *Broker) write(buf []byte) (n int, err error) {
return b.conn.Write(buf)
}
上面就是整个数据的传递路径。
2, producer.SendMessage 发送消息
func (sp *syncProducer) SendMessage(msg *ProducerMessage) (partition int32, offset int64, err error) {
sp.producer.Input() <- msg
}
SendMessage 就很简单了,往producer.Input的chanel里扔数据就好了。
上面就是sarama的生产消息流程,总结下,核心流程如下:
syncProducer->topicProducer->partitionProducer->brokerProducer
消息就是沿着这几个对象的input chanel 向下流动,最后通过tcp连接发送给kafka。