kafka 0.9.0.1 java消费者卡在等待metadataupdate()中

sirbozc5  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(210)

我试图让一个简单的kafka消费者使用javaapi v0.9.0.1工作。我使用的kafka服务器是一个docker容器,也运行版本0.9.0.1。以下是消费者代码:

public class Consumer {
    public static void main(String[] args) throws IOException {

        KafkaConsumer<String, String> consumer;
        try (InputStream props = Resources.getResource("consumer.props").openStream()) {
            Properties properties = new Properties();
            properties.load(props);
            consumer = new KafkaConsumer<>(properties);
        }

        consumer.subscribe(Arrays.asList("messages"));
        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(100);
                for (ConsumerRecord<String, String> record : records)
                    System.out.println("Message received: " + record.value());
            }
        }catch(WakeupException ex){
            System.out.println("Exception caught " + ex.getMessage());
        }finally{
            consumer.close();
            System.out.println("After closing KafkaConsumer");
        }
    }
}

但是,在启动消费者时,它调用上面的poll(100)方法,并且从不返回。在调试过程中,似乎永远无法运行org.apache.kafka.clients.consumer.internals.consumernetworkclient中的以下方法:

public void awaitMetadataUpdate() {
    int version = this.metadata.requestUpdate();

    do {
        this.poll(9223372036854775807L);
    } while(this.metadata.version() == version);

}

(version和this.metadata.version()似乎总是==2)。此外,尽管它没有抛出错误,但从未看到来自java生产者的消息进入队列。我已经验证了使用命令行kafka工具,我可以从队列发送和接收消息。
有人知道这是怎么回事吗?

relj7zay

relj7zay1#

如果这对其他有类似问题的人有帮助,我的解决方案是设置以下环境变量:

ADVERTISED_HOST=localhost
ADVERTISED_PORT=9092

(当然,此处的值可能会更改以适合您的安装)
显然,命令行使用者和生产者脚本可以在不设置这些env变量的情况下找到代理并与之通信,但是javaapi实现不能。也不会抛出错误,只是在尝试更新元数据时,在第一次轮询时抛出一个无限循环。

相关问题