使用spark每小时消耗一个Kafka主题

dy1byipe  于 2021-07-12  发布在  Spark
关注(0)|答案(1)|浏览(260)

我想将一个Kafka主题作为一个批处理来使用,在这个批处理中,我希望每小时阅读一次Kafka主题,并阅读最新的每小时数据。

val readStream = existingSparkSession
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", hostAddress)
  .option("subscribe", "kafka.raw")
  .load()

但它总是读取前20个数据行,并且这些行是从一开始就开始的,所以它从不选择最新的数据行。
如何使用scala和spark按小时读取最新的行?

gkl3eglg

gkl3eglg1#

如果你在批处理模式下阅读Kafka的信息,你需要注意簿记哪些数据是新的,哪些不是你自己。请记住,spark不会将任何消息提交回kafka,因此每次重新启动批处理作业时,它都会从头开始读取(或根据设置) startingOffsets 默认为 earliest 用于批处理查询。
对于希望每小时运行一次作业并只处理前一小时到达kafka的新数据的场景,可以使用writestream触发器选项 Trigger.Once 用于流式查询。
databricks有一个不错的博客,很好地解释了为什么流式查询 Trigger.Once 应优先于批处理查询。
要点是:
“当您运行执行增量更新的批处理作业时,通常必须弄清楚哪些数据是新的,哪些应该处理,哪些不应该处理。结构化流媒体已经为您完成了所有这些。”
确保在writestream中也设置了“checkpointlocation”选项。最后,您可以有一个简单的cron作业,每小时提交一次流作业。

相关问题