我试图理解spark结构化流是如何决定 endingOffsets
当批处理查询设置为“最新”时。如果一个kafka主题持续被写入,并且我运行了一个batch spark结构化流式处理作业来读取前面提到的主题(在作业执行期间继续被写入),那么如何确定最新的位置?根据文件, endingOffsets
是:
批处理查询结束时的结束点,可以是“latest”(仅指最新的),也可以是指定每个topicpartition的结束偏移量的json字符串。在json中,可以使用-1作为偏移量来引用latest,不允许使用-2(earliest)作为偏移量。
基于这一点 endingOffsets
以工作开始的时间为依据?下面是使用批处理查询从主题读取的代码片段。
# Subscribe to 1 topic defaults to the earliest and latest offsets
df = spark \
.read \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("subscribe", "topic1") \
.option("endingOffsets", "latest") \
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
暂无答案!
目前还没有任何答案,快来回答吧!