kafka使用者总是给出java.nio.channels.closedchannelexception

hjzp0vay  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(350)

我正在尝试执行以下命令:

./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 101.10.51.1:9092,101.10.51.4:9092 --topic namespace_deep_archive_d_billing_transaction --time -2

程序总是会出现以下错误:

[2018-08-23 12:36:58,604] WARN Fetching topic metadata with correlation id 0 for topics [Set(namespace_deep_archive_d_billing_transaction)] from broker [BrokerEndPoint(0,101.10.51.1,9092)] failed (kafka.client.ClientUtils$)
java.nio.channels.ClosedChannelException
        at kafka.network.BlockingChannel.send(BlockingChannel.scala:124)
        at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:82)
        at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:81)
        at kafka.producer.SyncProducer.send(SyncProducer.scala:126)
        at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:63)
        at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:99)
        at kafka.tools.GetOffsetShell$.main(GetOffsetShell.scala:98)
        at kafka.tools.GetOffsetShell.main(GetOffsetShell.scala)
[2018-08-23 12:36:59,616] WARN Fetching topic metadata with correlation id 0 for topics [Set(namespace_deep_archive_d_billing_transaction)] from broker [BrokerEndPoint(1,101.10.51.4,9092)] failed (kafka.client.ClientUtils$)

我在管理 getOffset 从其他服务器。但是,这个服务器可以telnet到kafka代理。
如果有人面临这个问题,你是如何解决的?

pes8fvy9

pes8fvy91#

我检查了getoffsetshell并将问题追溯到 /etc/hosts 文件。
这是getoffsetshell.scala中的代码段

val url = new URI(options.valueOf(urlOpt))
    val topic = options.valueOf(topicOpt)
    val partition = options.valueOf(partitionOpt).intValue
    var time = options.valueOf(timeOpt).longValue
    val nOffsets = options.valueOf(nOffsetsOpt).intValue
    val consumer = new SimpleConsumer(url.getHost, url.getPort, 10000, 100000)

即使我传递了代理的ip地址,它们也会被解析为相应的主机名。消费者代码对 /etc/hosts 找不到主机名和ip地址之间的Map,引发异常。
在/etc/hosts中添加服务器名和ip后,代码现在可以从kafka代理获取偏移量和使用记录。
参考文献:https://github.com/kafka-dev/kafka/blob/master/core/src/main/scala/kafka/tools/getoffsetshell.scalahttps用法:/github.com/kafka-dev/kafka/blob/master/core/src/main/scala/kafka/consumer/simpleconsumer.scalahttps://github.com/spujadas/elk-docker/issues/54

相关问题