我有一个结构化的流Dataframe tempDataFrame2
包括 Field1
. 我在试着计算 Field1
. 但是,每当我打字的时候 val Array(Q1, Q3) = tempDataFrame2.stat.approxQuantile("Field1", Array(0.25, 0.75), 0.0)
我收到以下错误消息: Queries with streaming sources must be executed with writeStream.start()
下面是代码段:
val tempDataFrame2 = A structured streaming dataframe
// Calculate IQR
val Array(Q1, Q3) = tempDataFrame2.stat.approxQuantile("Field1", Array(0.25, 0.75), 0.0)
// Filter messages
val tempDataFrame3 = tempDataFrame2.filter("Some working filter")
val query = tempDataFrame2.writeStream.outputMode("append").queryName("table").format("console").start()
query.awaitTermination()
我已经通过这两个链接从so:link1link2。不幸的是,我无法将这些回答与我的问题联系起来。
编辑
在阅读了评论之后,以下是我打算继续的方式:
1) 阅读Kafka主题的所有未提交的偏移量。2) 将它们保存到dataframe变量。3) 停止结构化的流媒体,这样我就不会再读Kafka的主题了。4) 从步骤2)开始处理保存的Dataframe。
但是,现在我不知道该怎么办-
1) 比如如何知道Kafka主题中没有任何其他记录可供使用并停止流式查询?
暂无答案!
目前还没有任何答案,快来回答吧!