我想从服务器上获取主题开头的所有消息。前任:bin/kafka-console-consumer.sh--缩放器localhost:2181 --topic testtopic--从头开始当使用上面的console命令时,我希望能够从一开始就获取一个主题中的所有消息,但是我不能从一开始就使用java代码来使用一个主题中的所有消息。
llycmphe1#
最简单的方法是启动一个消费者,并将所有消息都排出。现在我不知道您的主题中有多少个分区,以及您是否已经有一个现有的使用者组,但是您有几个选项:看看这个api:https://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/kafkaconsumer.html1) 如果您已经有一个消费者在同一个消费者组中,并且仍然希望从头开始消费,那么您应该使用 seek 选项,并将组中每个使用者的偏移量设置为0。这将从一开始就消耗掉。2) 否则,您可以在一个新的消费群中创建一些消费者&您就不必担心seek。附言:如果你对Kafka有更多的问题,请记得在将来提供更多关于你的设置的细节。很多事情取决于您如何配置您的基础设施&您希望它是什么样的,因此会因情况而异。
seek
lxkprmvk2#
TopicPartition topicPartition = new TopicPartition(topic, 0); List<TopicPartition> partitions = Arrays.asList(topicPartition); consumer.assign(partitions); consumer.seekToBeginning(partitions);
uelo1irk3#
只要改变消费群体consumerconfig.group\u id\u config-到新的组id并设置自动\u偏移\u重置\u配置-最早样本代码-
props.put(ConsumerConfig.GROUP_ID_CONFIG, "newID"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
30byixjq4#
您可以使用以下命令获取所有消息:
cd Users/kv/kafka/bin ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topicName --from-beginning --max-messages 100
4条答案
按热度按时间llycmphe1#
最简单的方法是启动一个消费者,并将所有消息都排出。现在我不知道您的主题中有多少个分区,以及您是否已经有一个现有的使用者组,但是您有几个选项:
看看这个api:https://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/kafkaconsumer.html
1) 如果您已经有一个消费者在同一个消费者组中,并且仍然希望从头开始消费,那么您应该使用
seek
选项,并将组中每个使用者的偏移量设置为0。这将从一开始就消耗掉。2) 否则,您可以在一个新的消费群中创建一些消费者&您就不必担心seek。
附言:如果你对Kafka有更多的问题,请记得在将来提供更多关于你的设置的细节。很多事情取决于您如何配置您的基础设施&您希望它是什么样的,因此会因情况而异。
lxkprmvk2#
uelo1irk3#
只要改变消费群体
consumerconfig.group\u id\u config-到新的组id
并设置
自动\u偏移\u重置\u配置-最早
样本代码-
30byixjq4#
您可以使用以下命令获取所有消息: