kafka消费者

r6vfmomb  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(362)

我有一个主题列表(现在是10个),其大小可以在将来增加。我知道我们可以生成多个线程(每个主题)来使用每个主题,但是在我的例子中,如果主题的数量增加,那么从主题中使用的线程的数量就会增加,这是我不希望的,因为主题不会太频繁地获取数据,所以线程将处于理想状态。
有没有办法让一个消费者从所有主题中消费?如果是的话,那我们怎样才能实现呢?Kafka又将如何维持补偿?请给出答案。

lnvxswe2

lnvxswe21#

我们可以使用以下api订阅多个主题:consumer.subscribe(arrays.aslist(topic1,topic2),consumerBalanceListener obj)
使用者有主题信息,我们可以使用consumer.commitsync或consumer.commitsync()创建offsetandmetadata对象,如下所示。

ConsumerRecords<String, String> records = consumer.poll(long value);
for (TopicPartition partition : records.partitions()) {
    List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
    for (ConsumerRecord<String, String> record : partitionRecords) {
        System.out.println(record.offset() + ": " + record.value());
    }
    long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
    consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
}
ajsxfq5m

ajsxfq5m2#

不需要多个线程,您可以有一个消费者,从多个主题消费。偏移量由zookeeper维护,因为kafka服务器本身是无状态的。每当使用者使用一条消息时,它的偏移量都会与zookeeper一起提交,以便将来跟踪处理每条消息一次。因此,即使在Kafka失败的情况下,消费者将开始消费从下一个最后承诺抵消。

相关问题