Kafka-读取一包消息

xoshrz7s  于 2021-06-08  发布在  Kafka
关注(0)|答案(2)|浏览(366)

我有一个批处理作业,它将数据填充到kafka主题。每条消息都有数据和作业标识符。在消费者方面,我只想阅读属于此工作的消息。在作业完成并消耗完所有消息之后,使用者端必须进行一些后处理。
1) 如果保证在作业期间不会生成其他消息,我如何理解作业已完成,并且作业生成的所有消息已被消耗(考虑到多个分区和异步)。
2) 如果不能保证在工作期间不会产生其他消息,我相信可以跳过噪音。
谢谢

lpwwtiir

lpwwtiir1#

我这里只谈第一个案子。请注意,这只是一个想法,我自己从未尝试过
你可以用 endOffsets() 获取所有分区的最后一个偏移量,然后在每条消息之后对所有分区进行循环,以检查所有当前偏移量是否与结束偏移量匹配。如果一切都是匹配的,那么你已经到达了终点。

n3ipq98p

n3ipq98p2#

我假设 job_id 是恒定的。在这种情况下,您可以检查您的消费者关闭,如果 n 随后的投票返回Kafka的空记录。 n 将取决于您的摄取率和消费者调查间隔。

相关问题