版本:GoLang 1.10.2 卡夫卡 4.4.1 码头工人 18.03.1

我正在尝试使用 Shopify 的 Sarama 包来测试我的 Kafka 实例。我使用 Docker compose 来支持 Kafka/Zookeeper,它都可以成功运行。

当我尝试使用 Sarama 创建 Producer 客户端时,会引发错误。

当我运行以下内容时

    package main

import (
"fmt"
"log"
"os"
"os/signal"
"time"

"strconv"

"github.com/Shopify/sarama"

)

func main() {


// Setup configuration
config := sarama.NewConfig()
config.Producer.Return.Successes = true
config.Producer.Partitioner = sarama.NewRandomPartitioner
config.Producer.RequiredAcks = sarama.WaitForAll
brokers := []string{"localhost:29092"}
producer, err := sarama.NewAsyncProducer(brokers, config)
if err != nil {
    // Should not reach here
    panic(err)
}

defer func() {
    if err := producer.Close(); err != nil {
        // Should not reach here
        panic(err)
    }
}()

我明白了

[sarama] 2018/06/12 17:22:05 初始化新客户端

[sarama] 2018/06/12 17:22:05 客户端/元数据从代理 localhost:29092 获取所有主题的元数据

[sarama] 2018/06/12 17:22:05 在 localhost:29092 连接到代理(未注册)

[sarama] 2018/06/12 17:22:05 客户端/元数据在获取元数据时从代理收到错误:EOF

[sarama] 2018/06/12 17:22:05 关闭与代理 localhost:29092 的连接

{sarama] 2018/06/12 17:22:05 客户端/元数据没有可用的代理发送元数据请求到

[sarama] 2018/06/12 17:22:06 关闭客户端 恐慌:kafka:客户端已经用完了可以与之交谈的代理(您的集群是否可达?)

goroutine 1 [运行中]: main.main() /Users/benwornom/go/src/github.com/acstech/doppler-events/testprod/main.go:29 +0x3ec 退出状态2

Sarama 确实连续多次尝试创建生产者客户端,但每次都失败。

我对Sarama的“NewAsyncProducer”方法的理解是,它调用了“NewClient”,不管你是创建Producer还是Consumer,都会调用它。 NewClient 尝试从 Kafka 代理收集元数据,这在我的情况下失败了。我知道它正在连接到 Kafka 代理,但是一旦连接,它似乎就中断了。任何意见将是有益的。我的网络连接很强,我想不出任何干扰服务器的东西。据我所知,现有主题只有一个代理和一个分区。我认为我不必手动将主题分配给经纪人。如果我的客户正在与代理连接,为什么我不能为我的生产者建立持久连接?

这是来自 kafka 日志文件,就在它死之前。

__consumer_offsets-5 -> Vector(1), connect-offsets-23 -> Vector(1), __consumer_offsets-43 -> Vector(1), __consumer_offsets-32 -> Vector(1), __consumer_offsets-21 -> Vector(1), __consumer_offsets-10 -> Vector(1), connect-offsets-20 -> Vector(1), __consumer_offsets-37 -> Vector(1), connect-offsets-9 -> Vector(1), 连接-status-4 -> Vector(1), __consumer_offsets-48 -> Vector(1), __consumer_offsets-40 -> Vector(1), __consumer_offsets-29 -> Vector(1), __consumer_offsets-18 -> Vector(1) , connect-offsets-14 -> Vector(1), __consumer_offsets-7 -> Vector(1), __consumer_offsets-34 -> Vector(1), __consumer_offsets-45 -> Vector(1), __consumer_offsets-23 -> Vector( 1)、connect-offsets-6 -> Vector(1)、connect-status-1 -> Vector(1)、connect-offsets-17 -> Vector(1)、connect-offsets-0 -> Vector(1) , connect-offsets-22 -> Vector(1), __consumer_offsets-26 -> Vector(1), connect-offsets-11 -> Vector(1), __consumer_offsets-15 -> Vector(1), __consumer_offsets-4 ->向量(1),__consumer_offsets-42 -> 向量( 1), __consumer_offsets-9 -> Vector(1), __consumer_offsets-31 -> Vector(1), __consumer_offsets-20 -> Vector(1), connect-offsets-3 -> Vector(1), __consumer_offsets-1 -> Vector(1), __consumer_offsets-12 -> Vector(1), connect-offsets-8 -> Vector(1), connect-offsets-19 -> Vector(1), connect-status-3 -> Vector(1) , __confluent.support.metrics-0 -> Vector(1), __consumer_offsets-17 -> Vector(1), __consumer_offsets-28 -> Vector(1), __consumer_offsets-6 -> Vector(1), __consumer_offsets-39 -> Vector(1), __consumer_offsets-44 -> Vector(1), connect-offsets-16 -> Vector(1), connect-status-0 -> Vector(1), connect-offsets-5 -> Vector(1) , connect-offsets-21 -> Vector(1), __consumer_offsets-47 -> Vector(1), __consumer_offsets-36 -> Vector(1), __consumer_offsets-14 -> Vector(1), __consumer_offsets-25 -> Vector( 1), __consumer_offsets-3 -> Vector(1), __consumer_offsets-30 -> Vector(1), __consumer_offsets-41 -> Vector(1), connect-offsets-13 -> Vector(1), connect-offsets-24 -> 向量(1),连接偏移量 2 -> Vector(1)、connect-configs-0 -> Vector(1)、__consumer_offsets-11 -> Vector(1)、__consumer_offsets-22 -> Vector(1)、__consumer_offsets-33 -> Vector(1)、__consumer_offsets-0 -> Vector(1), connect-offsets-7 -> Vector(1), connect-offsets-18 -> Vector(1))) (kafka.controller.KafkaController) [36mkafka_1 |[0m [2018-06-12 20:24:47,461] 调试 [控制器 ID = 1] 主题不在代理 1 Map() 的首选副本中 (kafka.controller.KafkaController) [36mkafka_1 |[0m [2018-06-12 20:24:47,462] TRACE [Controller id=1] broker 1 的领导者不平衡率为 0.0 (kafka.controller.KafkaController)