spark结构化流在使用streamingquerylistener queryprogressevent时获取不正确的输入行数

5ktev3wc  于 2021-07-13  发布在  Spark
关注(0)|答案(1)|浏览(303)

我在使用streamingquerylistener来标识我正在使用的输入行数时遇到了问题 queryProgress.progress().numInputRows() 当除了write之外没有其他操作时,我得到了正确的计数,但是当我添加某些操作(如df.count或df.isempty()时,我的输入行计数就会中断。
非常感谢您的帮助
编辑
以下代码工作

df.writeStream().outputMode("append").foreachBatch(new VoidFunction2<Dataset<Row>,Long>(){
  @Override
  public void call(Dataset<Row> streamDataset, Long batchId) throws Exception {
    streamDataset.write().mode(SaveMode.Append).save("namesAndFavColors.parquet");
  }
}).start();

这算错了

df.writeStream().outputMode("append").foreachBatch(new VoidFunction2<Dataset<Row>,Long>(){
  @Override
  public void call(Dataset<Row> streamDataset, Long batchId) throws Exception {
    streamDataset.count();                                    
    streamDataset.write().mode(SaveMode.Append).save("namesAndFavColors.parquet");
  }
}).start();

笔记
请忽略write()代码,在实际场景中数据正在写入mysql

6rqinv9w

6rqinv9w1#

当你定义不止一个动作时

streamDataset.count();                                    
streamDataset.write().mode(SaveMode.Append).save("namesAndFavColors.parquet");

spark创建两个“独立”的流,每个流使用相同的数据。但是,两个流都在调用 onQueryProgress . 当这两个流被 Package 到同一个流中时,就会同时发生这种情况 foreachBatch .
因此,在您的特定情况下,您将在您的数据库中看到两倍的数据 NumInputRowscount .
这个系数将根据你所做的动作的数量而增加。

相关问题