流源查询必须使用writestream.start()执行

xurqigkl  于 2021-06-08  发布在  Kafka
关注(0)|答案(0)|浏览(214)

我有一个结构化的流Dataframe tempDataFrame2 包括 Field1 . 我在试着计算 Field1 . 但是,每当我打字的时候 val Array(Q1, Q3) = tempDataFrame2.stat.approxQuantile("Field1", Array(0.25, 0.75), 0.0) 我收到以下错误消息: Queries with streaming sources must be executed with writeStream.start() 下面是代码段:

val tempDataFrame2 = A structured streaming dataframe

// Calculate IQR
val Array(Q1, Q3) = tempDataFrame2.stat.approxQuantile("Field1", Array(0.25, 0.75), 0.0)

// Filter messages
val tempDataFrame3 = tempDataFrame2.filter("Some working filter")

val query = tempDataFrame2.writeStream.outputMode("append").queryName("table").format("console").start()
query.awaitTermination()

我已经通过这两个链接从so:link1link2。不幸的是,我无法将这些回答与我的问题联系起来。

编辑

在阅读了评论之后,以下是我打算继续的方式:
1) 阅读Kafka主题的所有未提交的偏移量。2) 将它们保存到dataframe变量。3) 停止结构化的流媒体,这样我就不会再读Kafka的主题了。4) 从步骤2)开始处理保存的Dataframe。
但是,现在我不知道该怎么办-
1) 比如如何知道Kafka主题中没有任何其他记录可供使用并停止流式查询?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题