我是新来Kafka和星火。我想知道在批处理模式下spark如何知道从哪个偏移量读取?如果我将“startingoffset”指定为“earliest”,那么我只获取最新的记录,而不是分区中的所有记录。我在两个不同的集群中运行了相同的代码。集群a(本地机器)获取了6条记录,集群b(tst集群-第一次运行)获取了1条记录。
df = spark \
.read \
.format("kafka") \
.option("kafka.bootstrap.servers", broker) \
.option("subscribe", topic) \
.option("startingOffsets", "earliest") \
.option("endingOffsets", "latest" ) \
.load()
我计划每天运行一次我的批处理,我会得到从昨天运行到当前运行的所有记录吗?在哪里可以看到批量查询的偏移和提交?
1条答案
按热度按时间332nm8kg1#
根据structured streaming+kafka integration指南,偏移量存储在您在
write
批处理作业的一部分。如果不删除检查点文件,则作业将继续从它停止的kafka中读取。如果删除检查点文件,或者第一次运行作业,作业将使用基于选项startingoffset的消息。