每分钟输出前n个小时的记录

toe95027  于 2021-05-26  发布在  Spark
关注(0)|答案(0)|浏览(270)

对于使用spark结构化流式处理的流,在过去的60分钟内保持包含前n条记录的更新表的最佳方法是什么?我只想每分钟更新一次我的表。我想我可以用 window 函数,将窗口持续时间设置为60分钟,将幻灯片持续时间设置为1分钟。问题是我得到了输出中所有滑动窗口的数据,但我只需要最后一个完全计算(关闭)的窗口。有办法做到这一点吗?或者我应该用另一种方式来解决这个问题(在过去的一个小时里保持几乎实时的排名)?
我的不完整解决方案:

val entityCounts = entities
  .withWatermark("timestamp", "1 minute")
  .groupBy(
    window(col("timestamp"), "60 minutes", "1 minute")
      .as("time_window"),
    col("entity_type"),
    col("entity"))
  .count()
val query = entityCounts.writeStream
  .foreachBatch( { (batchDF, _) =>
    batchDF
      .withColumn(
        "row_number", row_number() over Window
          .partitionBy("entity_type")
          .orderBy(col("count").desc))

      .filter(col("row_number") <= 5)
      .select(
        col("entity_type"),
        col("entity"),
        col("count").as("occurrences"))
      .write
      .cassandraFormat("table", "keyspace")
      .mode("append")
      .save
  })
  .outputMode("append")
  .start()

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题