如何从kafka服务器获取主题中的所有消息

xam8gpfp  于 2021-06-08  发布在  Kafka
关注(0)|答案(4)|浏览(356)

我想从服务器上获取主题开头的所有消息。
前任:
bin/kafka-console-consumer.sh--缩放器localhost:2181 --topic testtopic--从头开始
当使用上面的console命令时,我希望能够从一开始就获取一个主题中的所有消息,但是我不能从一开始就使用java代码来使用一个主题中的所有消息。

llycmphe

llycmphe1#

最简单的方法是启动一个消费者,并将所有消息都排出。现在我不知道您的主题中有多少个分区,以及您是否已经有一个现有的使用者组,但是您有几个选项:
看看这个api:https://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/kafkaconsumer.html
1) 如果您已经有一个消费者在同一个消费者组中,并且仍然希望从头开始消费,那么您应该使用 seek 选项,并将组中每个使用者的偏移量设置为0。这将从一开始就消耗掉。
2) 否则,您可以在一个新的消费群中创建一些消费者&您就不必担心seek。
附言:如果你对Kafka有更多的问题,请记得在将来提供更多关于你的设置的细节。很多事情取决于您如何配置您的基础设施&您希望它是什么样的,因此会因情况而异。

lxkprmvk

lxkprmvk2#

TopicPartition topicPartition = new TopicPartition(topic, 0);
List<TopicPartition> partitions = Arrays.asList(topicPartition); 
consumer.assign(partitions); consumer.seekToBeginning(partitions);
uelo1irk

uelo1irk3#

只要改变消费群体
consumerconfig.group\u id\u config-到新的组id
并设置
自动\u偏移\u重置\u配置-最早
样本代码-

props.put(ConsumerConfig.GROUP_ID_CONFIG, "newID");
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
30byixjq

30byixjq4#

您可以使用以下命令获取所有消息:

cd Users/kv/kafka/bin

./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topicName --from-beginning --max-messages 100

相关问题