spark结构化流式处理作业问题

k97glaaz  于 2021-05-19  发布在  Spark
关注(0)|答案(0)|浏览(323)

我正在尝试使用spark structured streaming api从azure eventhub流式处理数据,azure eventhub的消息保留期为4天,如果我连续流式处理数据(即,不调用任何trigger.once()api),则我的流式处理作业可以正常工作。这里的挑战是,我每天只能获得7-8gb的数据,我正在尝试使用trigger.once()api实现作业,这样我就可以每天运行一次计划作业。
当我尝试在trigger.once()api的帮助下运行作业时,streaming job会插入4-5分钟的数据并自动关闭会话,即使数据在eventhub端可用。如果我再次触发同一个作业,该作业将再次插入4-5分钟的数据并自动关闭会话。

val dataWrite = dtcFinalDF.writeStream.format("JSON").partitionBy("year", "month", "day").option("checkpointLocation", "/mnt/persisted/EventHub_TruckConnectData/TruckConnect_Data_Checkpoint").outputMode(OutputMode.Append).trigger(Trigger.Once()).start("/mnt/persisted/EventHub_TruckConnectData/TruckConnect_Data_Output")

//Process all the available data and close the session

dataWrite.processAllAvailable()
dataWrite.stop()

但是,当我在不调用trigger.once()api的情况下手动运行相同的流作业时,作业会成功地将数据插入blob位置,并且不会间歇性地关闭会话。

dtcFinalDF.writeStream.format("JSON").partitionBy("year", "month", "day").option("checkpointLocation","/mnt/truckconnectdatasample/TruckConnect_Data_Checkpoint").outputMode(OutputMode.Append).start("/mnt/truckconnectdatasample/TruckConnect_Data_Output")

在这里我无法破解的一个问题是,如果我实现trigger.once(),为什么即使数据在源端(azureeventhub)可用,作业也会在4-5分钟后终止。任何帮助都将不胜感激。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题