我想将一个Kafka主题作为一个批处理来使用,在这个批处理中,我希望每小时阅读一次Kafka主题,并阅读最新的每小时数据。
val readStream = existingSparkSession
.read
.format("kafka")
.option("kafka.bootstrap.servers", hostAddress)
.option("subscribe", "kafka.raw")
.load()
但它总是读取前20个数据行,并且这些行是从一开始就开始的,所以它从不选择最新的数据行。
如何使用scala和spark按小时读取最新的行?
1条答案
按热度按时间gkl3eglg1#
如果你在批处理模式下阅读Kafka的信息,你需要注意簿记哪些数据是新的,哪些不是你自己。请记住,spark不会将任何消息提交回kafka,因此每次重新启动批处理作业时,它都会从头开始读取(或根据设置)
startingOffsets
默认为earliest
用于批处理查询。对于希望每小时运行一次作业并只处理前一小时到达kafka的新数据的场景,可以使用writestream触发器选项
Trigger.Once
用于流式查询。databricks有一个不错的博客,很好地解释了为什么流式查询
Trigger.Once
应优先于批处理查询。要点是:
“当您运行执行增量更新的批处理作业时,通常必须弄清楚哪些数据是新的,哪些应该处理,哪些不应该处理。结构化流媒体已经为您完成了所有这些。”
确保在writestream中也设置了“checkpointlocation”选项。最后,您可以有一个简单的cron作业,每小时提交一次流作业。