spark结构化流如何确定批量查询的endingoffset?

bfrts1fy  于 2021-05-29  发布在  Spark
关注(0)|答案(0)|浏览(265)

我试图理解spark结构化流是如何决定 endingOffsets 当批处理查询设置为“最新”时。如果一个kafka主题持续被写入,并且我运行了一个batch spark结构化流式处理作业来读取前面提到的主题(在作业执行期间继续被写入),那么如何确定最新的位置?根据文件, endingOffsets 是:
批处理查询结束时的结束点,可以是“latest”(仅指最新的),也可以是指定每个topicpartition的结束偏移量的json字符串。在json中,可以使用-1作为偏移量来引用latest,不允许使用-2(earliest)作为偏移量。
基于这一点 endingOffsets 以工作开始的时间为依据?下面是使用批处理查询从主题读取的代码片段。


# Subscribe to 1 topic defaults to the earliest and latest offsets

df = spark \
  .read \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribe", "topic1") \
  .option("endingOffsets", "latest") \
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题