kafka消费者的poll()方法被阻止

9avjhtql  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(739)

我是kafka0.9的新手,在测试一些特性时,我意识到java实现的消费者中有一种奇怪的行为( KafkaConsumer ).
Kafka代理位于ambari外部机器中。
即使我可以实现一个生产者并开始向外部代理发送消息,我也不知道为什么当消费者试图读取事件(poll)时,它会卡住。
我知道producer工作得很好,因为我确实可以通过console consumer(在ambari上本地工作)使用消息。但是当我执行java消费者时,什么也没发生,只是卡住了。调试代码时,我可以看到它在 poll() 生产线:

ConsumerRecords<String, String> records = consumer.poll(100);

顺便说一句,超时没有任何作用。不管您输入0、100或1000毫秒,消费程序都会被阻塞在这一行中,并且不会超时或抛出异常。
我尝试了所有类型的可选属性,例如advised.host.name、advised.listener,。。。以此类推,零运气。
任何帮助都将不胜感激。提前谢谢!

ac1kyiln

ac1kyiln1#

原因可能是运行用户代码的计算机无法连接到zookeeper。试着在安装了kafka的机器上运行相同的用户代码(我试过这个并为我工作)。我还通过在server.properties文件中提到以下属性来解决问题: advertised.host.name="ip address which you want to expose" //在我的情况下,这是公共ip的ec2机器,我有Kafka和zookeeper安装在同一个ec2。
advertised.port=9092 ConsumerRecords<String, String> records = consumer.poll(100); 上述声明并不意味着消费者将在100毫秒后超时,这是投票期。它在100毫秒内捕获的任何数据都被读入记录集合。

zpjtge22

zpjtge222#

在我的例子中,poll()方法最终陷入了无限循环ensurecoordinatorready(),协调器这个词告诉我协调器运行在另一个主机上(出于测试目的,我只在我的/etc/hosts中添加了一个代理主机,而总共有三个代理主机)。所以消费者正确地得到了消费者协调器。
因此解决方案是:在/etc/hosts文件中正确配置运行kafka代理的主机

相关问题