如何在spark结构化流foreachbatch方法中实现聚合?

v2g6jxz6  于 2021-07-13  发布在  Spark
关注(0)|答案(1)|浏览(381)

我有一个用例,
我需要从数据源读取记录并将其写入多个接收器,包括kafka,以及一些聚合。
下面是我的伪代码的样子,

Dataset<Row> dataset = spark.readStream()......

dataset.writeStream().foreachBatch(
// do some processing, including aggregations
// write it to multiple sinks
batch.write().format('kafka').save();
).start().awaitTermination();

当我在foreach中尝试某种聚合方法时,默认情况下,它采用append模式并删除旧的聚合。因此,输出只包含当前批处理的结果。
我的要求是,当第二批数据到达时,它应该与第一批数据的结果合并。
例如:对于查询, dataset.groupBy("id").count(value) 如果第一批输入是:{“id”:1,“value”:1},{“id”:1,“value”:1}
输出:{“id”:1,“value”:“2”}
第二批输入:{“id”:1,“value”:3},{“id”:1,“value”:2}
输出:{“id”:1,“value”:5}
预期输出:{“id”:1,“value”:7}
如何在spark中实现这一点?
提前谢谢。

z8dt9xmd

z8dt9xmd1#

上面的示例是dstream示例,而不是结构化流。您需要将spark结构化流看作是将数据加载到一个无界表中。
假设数据源是kafka,下面是结构化流的一个基本示例。请注意,使用readstream和writestream api无法进行模式推断。模式需要来自数据源连接器,在本例中是kafka。

val df = sparkSession
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9031")
  .option("subscribe", "word-count")
  .option("startingOffsets", "latest")
  .option("failOnDataLoss", "false")
  .load()

val query = df
  .selectExpr("CAST(key AS STRING) as key", "CAST(value AS STRING) as value")
  .as[(String, String)]
  .select(from_json($"value", schema = schema).as("data"))
  .writeStream
  .format("parquet")
  .option("path", "/parquet/word-count/")
  .option("checkpointLocation", "/tmp/word-count-chkpnt")
  .trigger(ProcessingTime("10 second"))
  .outputMode(OutputMode.Append())
  .start()

使用.trigger()函数创建微批,outputmode保存每个微批的结果。在这个例子中,我每10秒创建一个微批, .trigger(ProcessingTime("10 second")) 以及将流中的每个事件作为一行附加到parquet文件 .outputMode(OutputMode.Append()) 在您的例子中,您需要使用.trigger(),并选择一个微批处理间隔 .outputMode(outputMode.Update()) 插入具有值的新键或使用递增值更新现有键。
下面的部分是聚合逻辑的发展方向。您可以将聚合逻辑分解为单独的Dataframe,并将Dataframe作为流写入,而不是为了可读性而进行链接。

.selectExpr("CAST(key AS STRING) as key", "CAST(value AS STRING) as value")
  .as[(String, String)]
  .select(from_json($"value", schema = schema).as("data"))

结构化流媒体的另一个例子。

相关问题