我想在写入流接收器(即eventhub)之前引入1小时的延迟。如何做到这一点?我尝试了以下方法:
sinkOutput
.withColumn("Timestamp", current_timestamp())
.withWatermark("Timestamp", "60 minutes")
.trigger(Trigger.ProcessingTime("60 minutes"))
.format("org.apache.spark.sql.eventhubs.EventHubsSourceProvider")
.option("checkpointLocation", checkpointLocation)
.options(ehConf.toMap) // EventHubsConf containing the destination EventHub connection string.
.start()
但它似乎不起作用。有什么我不知道的吗?
暂无答案!
目前还没有任何答案,快来回答吧!