公司决定使用kafka来作为新一代的消息队列来使用,于是开始对kafka的机制,原理,go客户端的使用,各种了解了一番,过程中也遇到了不少的坑,特地写出来,和大家分享,也供自己参考,加深印象。

首先,kafka的设计思想,各个角色比如broker,producer,consumer,partition等等还有与它们相关的配置,这里就先不作介绍了,官方文档都有,文章后面也会提到。

附上kafka官方文档链接: 
http://kafka.apachecn.org/documentation.html

客户端选择: 
go连接kafka的客户端不多,综合对比了下,决定使用sarama 
“go get github.com/Shopify/sarama”

生产者:

func SaramaProducer()  {

    config := sarama.NewConfig()
    //等待服务器所有副本都保存成功后的响应
    config.Producer.RequiredAcks = sarama.WaitForAll
    //随机向partition发送消息
    config.Producer.Partitioner = sarama.NewRandomPartitioner
    //是否等待成功和失败后的响应,只有上面的RequireAcks设置不是NoReponse这里才有用.
    config.Producer.Return.Successes = true
    config.Producer.Return.Errors = true
    //设置使用的kafka版本,如果低于V0_10_0_0版本,消息中的timestrap没有作用.需要消费和生产同时配置
    //注意,版本设置不对的话,kafka会返回很奇怪的错误,并且无法成功发送消息
    config.Version = sarama.V0_10_0_1

    fmt.Println("start make producer")
    //使用配置,新建一个异步生产者
    producer, e := sarama.NewAsyncProducer([]string{"182.61.9.153:6667","182.61.9.154:6667","182.61.9.155:6667"}, config)
    if e != nil {
        fmt.Println(e)
        return
    }
    defer producer.AsyncClose()

    //循环判断哪个通道发送过来数据.
    fmt.Println("start goroutine")
    go func(p sarama.AsyncProducer) {
        for{
            select {
            case  <-p.Successes():
                //fmt.Println("offset: ", suc.Offset, "timestamp: ", suc.Timestamp.String(), "partitions: ", suc.Partition)
            case fail := <-p.Errors():
                fmt.Println("err: ", fail.Err)
            }
        }
    }(producer)

    var value string
    for i:=0;;i++ {
        time.Sleep(500*time.Millisecond)
        time11:=time.Now()
        value = "this is a message 0606 "+time11.Format("15:04:05")

        // 发送的消息,主题。 
        // 注意:这里的msg必须得是新构建的变量,不然你会发现发送过去的消息内容都是一样的,因为批次发送消息的关系。
        msg := &sarama.ProducerMessage{
            Topic: "0606_test",
        }

        //将字符串转化为字节数组
        msg.Value = sarama.ByteEncoder(value)
        //fmt.Println(value)

        //使用通道发送
        producer.Input() <- msg
    }
}

这里使用的是异步producer,kafka的producer有个特点是,批次发送,这么做的好处就是,可以提高吞吐量,所以我们在看几个主流的消息队列性能测试对比的时候,kafka的吞吐量是遥遥领先的。 
producer还有个特性是,就是每发送一次消息,都会要求broker返回一个消息回执,即ack。如果ack没有收到,producer会进行重发,如果设置了重发次数的话。这个ack有三种模式:

// The level of acknowledgement reliability needed from the broker (defaults
// to WaitForLocal). Equivalent to the `request.required.acks` setting of the JVM producer.
// 等同于jvm kafka中的`request.required.acks` 
        RequiredAcks RequiredAcks

type RequiredAcks int16
const (
// 第一个模式,NoResponse doesn't send any response, the TCP ACK is all you get.
    NoResponse RequiredAcks = 0
//第二个模式, WaitForLocal waits for only the local commit to succeed before responding.
    WaitForLocal RequiredAcks = 1
// 第三个模式,WaitForAll waits for all in-sync replicas to commit before responding.
// The minimum number of in-sync replicas is configured on the broker via
// the `min.insync.replicas` configuration key.
    WaitForAll RequiredAcks = -1
)


如果RequiredAcks设置为0,在这种情况下,服务器是否收到请求是没法保证的,并且参数retries(重发)也不会生效(因为客户端无法获得失败信息)。既然提到了重发,可以看一下下面sarama的重发定义:

Retry struct {
            // The total number of times to retry sending a message (default 3).
            // Similar to the `message.send.max.retries` setting of the JVM producer.
            Max int
            // How long to wait for the cluster to settle between retries
            // (default 100ms). Similar to the `retry.backoff.ms` setting of the
            // JVM producer.
            Backoff time.Duration
        }

消费者:

func SaramaConsumer()  {

    fmt.Println("start consume")
    config := sarama.NewConfig()

    //提交offset的间隔时间,每秒提交一次给kafka
    config.Consumer.Offsets.CommitInterval = 1 * time.Second

    //设置使用的kafka版本,如果低于V0_10_0_0版本,消息中的timestrap没有作用.需要消费和生产同时配置
    config.Version = sarama.V0_10_0_1

//consumer新建的时候会新建一个client,这个client归属于这个consumer,并且这个client不能用作其他的consumer
    consumer, err := sarama.NewConsumer([]string{"182.61.9.153:6667","182.61.9.154:6667","182.61.9.155:6667"}, config)
    if err != nil {
        panic(err)
    }

//新建一个client,为了后面offsetManager做准备
    client, err := sarama.NewClient([]string{"182.61.9.153:6667","182.61.9.154:6667","182.61.9.155:6667"}, config)
    if err != nil {
        panic("client create error")
    }
    defer client.Close()

//新建offsetManager,为了能够手动控制offset
    offsetManager,err:=sarama.NewOffsetManagerFromClient("group111",client)
    if err != nil {
        panic("offsetManager create error")
    }
    defer offsetManager.Close()

//创建一个第2分区的offsetManager,每个partition都维护了自己的offset
    partitionOffsetManager,err:=offsetManager.ManagePartition("0606_test",2)
    if err != nil {
        panic("partitionOffsetManager create error")
    }
    defer partitionOffsetManager.Close()


    fmt.Println("consumer init success")

    defer func() {
        if err := consumer.Close(); err != nil {
            log.Fatalln(err)
        }
    }()

    //sarama提供了一些额外的方法,以便我们获取broker那边的情况
    topics,_:=consumer.Topics()
    fmt.Println(topics)
    partitions,_:=consumer.Partitions("0606_test")
    fmt.Println(partitions)

//第一次的offset从kafka获取(发送OffsetFetchRequest),之后从本地获取,由MarkOffset()得来
    nextOffset,_:=partitionOffsetManager.NextOffset()
    fmt.Println(nextOffset)

//创建一个分区consumer,从上次提交的offset开始进行消费
    partitionConsumer, err := consumer.ConsumePartition("0606_test", 2, nextOffset+1)
    if err != nil {
        panic(err)
    }

    defer func() {
        if err := partitionConsumer.Close(); err != nil {
            log.Fatalln(err)
        }
    }()

    // Trap SIGINT to trigger a shutdown.
    signals := make(chan os.Signal, 1)
    signal.Notify(signals, os.Interrupt)

    fmt.Println("start consume really")

ConsumerLoop:
    for {
        select {
        case msg := <-partitionConsumer.Messages():
            log.Printf("Consumed message offset %d\n message:%s", msg.Offset,string(msg.Value))
            //拿到下一个offset
            nextOffset,offsetString:=partitionOffsetManager.NextOffset()
            fmt.Println(nextOffset+1,"...",offsetString)
            //提交offset,默认提交到本地缓存,每秒钟往broker提交一次(可以设置)
            partitionOffsetManager.MarkOffset(nextOffset+1,"modified metadata")

        case <-signals:
            break ConsumerLoop
        }
    }
}


至此,一个初步的consumer构建好了,很多关于consumer的内容见上面代码的注释。可以根据consumer.Partitions(“topic”)来获取这个topic的所有分区,然后为每个分区构建一个consumer,然后进行消费。

这样挺麻烦的,也不够优雅,其实kafka的consumer还有个很重要的机制,就是consumer group,可惜sarama并不支持,不过有另一个开源的库,叫做”github.com/bsm/sarama-cluster”,它是在sarama上加了一层封装,支持了consumer group。