我有一个批处理作业,它将数据填充到kafka主题。每条消息都有数据和作业标识符。在消费者方面,我只想阅读属于此工作的消息。在作业完成并消耗完所有消息之后,使用者端必须进行一些后处理。1) 如果保证在作业期间不会生成其他消息,我如何理解作业已完成,并且作业生成的所有消息已被消耗(考虑到多个分区和异步)。2) 如果不能保证在工作期间不会产生其他消息,我相信可以跳过噪音。谢谢
lpwwtiir1#
我这里只谈第一个案子。请注意,这只是一个想法,我自己从未尝试过你可以用 endOffsets() 获取所有分区的最后一个偏移量,然后在每条消息之后对所有分区进行循环,以检查所有当前偏移量是否与结束偏移量匹配。如果一切都是匹配的,那么你已经到达了终点。
endOffsets()
n3ipq98p2#
我假设 job_id 是恒定的。在这种情况下,您可以检查您的消费者关闭,如果 n 随后的投票返回Kafka的空记录。 n 将取决于您的摄取率和消费者调查间隔。
job_id
n
2条答案
按热度按时间lpwwtiir1#
我这里只谈第一个案子。请注意,这只是一个想法,我自己从未尝试过
你可以用
endOffsets()
获取所有分区的最后一个偏移量,然后在每条消息之后对所有分区进行循环,以检查所有当前偏移量是否与结束偏移量匹配。如果一切都是匹配的,那么你已经到达了终点。n3ipq98p2#
我假设
job_id
是恒定的。在这种情况下,您可以检查您的消费者关闭,如果n
随后的投票返回Kafka的空记录。n
将取决于您的摄取率和消费者调查间隔。