我想管理Kafka主题中的偏移量以及数据库,这样如果我想在队列中某个点之后重新处理,我就可以。我该怎么做?提前谢谢。
wf82jlnq1#
给定一个分区信息,您应该能够告诉您的消费者 seekToBeginning 或者 seek 到那个分区的偏移量。消费者记录知道它的主题、分区和偏移量。你可以把这些事实记录在数据库里。但这里的关键是,如果你的主题是分区的。你的数据将按时间顺序排列。因此,如果有两个分区,并且基本上是按姓氏分区,那么字母表的前半部分的名称更改将是连续的,后半部分的名称更改将是连续的,但是如何获得整个系统中名称更改的单一时间顺序视图并不明显。但是,如果记录了数据库中某个特定更改的分区和偏移量,则可以从该点开始查找该分区和偏移量并重新处理流。(如果您只有一个分区,这就变得无关紧要了,但是您的主题或流式体系结构何时/是否需要多个分区需要考虑)退一步从实际问题到理论,我真的不知道你为什么要这样做,因为消费者团体将记录你的承诺抵消Kafka本身,因此,如果你的流处理应用程序崩溃,你可以从你没有担心的地方拿起。如果设置了enable.auto.commit属性,则此消息提交将自动发生;如果调用 commitSync() 关于消费者。或者你正试图使用一个不可变的数据存储(kafka),就像使用一个可变的存储一样,但这只是一个纯粹的推测,基于这样一个事实,你没有真正描述你为什么要做你想做的事情。
seekToBeginning
seek
commitSync()
1条答案
按热度按时间wf82jlnq1#
给定一个分区信息,您应该能够告诉您的消费者
seekToBeginning
或者seek
到那个分区的偏移量。消费者记录知道它的主题、分区和偏移量。你可以把这些事实记录在数据库里。
但这里的关键是,如果你的主题是分区的。你的数据将按时间顺序排列。因此,如果有两个分区,并且基本上是按姓氏分区,那么字母表的前半部分的名称更改将是连续的,后半部分的名称更改将是连续的,但是如何获得整个系统中名称更改的单一时间顺序视图并不明显。
但是,如果记录了数据库中某个特定更改的分区和偏移量,则可以从该点开始查找该分区和偏移量并重新处理流。
(如果您只有一个分区,这就变得无关紧要了,但是您的主题或流式体系结构何时/是否需要多个分区需要考虑)
退一步从实际问题到理论,我真的不知道你为什么要这样做,因为消费者团体将记录你的承诺抵消Kafka本身,因此,如果你的流处理应用程序崩溃,你可以从你没有担心的地方拿起。如果设置了enable.auto.commit属性,则此消息提交将自动发生;如果调用
commitSync()
关于消费者。或者你正试图使用一个不可变的数据存储(kafka),就像使用一个可变的存储一样,但这只是一个纯粹的推测,基于这样一个事实,你没有真正描述你为什么要做你想做的事情。