spark结构化流媒体-对最近x小时的数据进行实时聚合

mkshixfv  于 2021-07-12  发布在  Spark
关注(0)|答案(0)|浏览(145)

我是spark的新手,我正在使用spark结构化流来读取scala语言中kafka的数据流。
我想使用apachespark聚合最后x小时的数据,并且(如果可能)只将更新写入目标,
假设我想要顾客的最低价格 ID1 在过去的1小时里,如果我有以下事件:

Events Data:
+------------------------------------------+-------------------------+---------+
|event_time                                |customer                 |price    |
+------------------------------------------+-------------------------+---------+
| 2021-03-09 11:00:00                      |ID1                      |2000     |
| 2021-03-09 11:28:00                      |ID1                      |1500     |
| 2021-03-09 15:20:00                      |ID1                      |2500     |
+------------------------------------------+-------------------------+---------+
at 2021-03-09 11:00:00 desired output (data between 10:00:00 and 11:00:00) :
+-------------------------+------------+
|customer                 |min_price   |
+-------------------------+------------+
|ID1                      |2000        |
+-------------------------+------------+
at 2021-03-09 11:28:00 desired output (data between 10:28:00 and 11:28:00):
+-------------------------+------------+
|customer                 |min_price   |
+-------------------------+------------+
|ID1                      |1500        |
+-------------------------+------------+
at 2021-03-09 15:20:00 desired output (data between 14:20:00 and 15:20:00):
+-------------------------+------------+
|customer                 |min_price   |
+-------------------------+------------+
|ID1                      |2500        |
+-------------------------+------------+

相反,Kafka是不断输出1500我试图过滤输入流到最后一个小时我试图使用滑动窗口,但我得到了太多的窗口,我只需要最后一个窗口,以最后一个事件时间结束。

val df = spark.readStream
              .format("kafka").option("kafka.bootstrap.servers", brokers)
              .option("subscribe", topics)
              .option("startingOffsets", "latest").load()

val ds1 = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").as[(String, String)]
// some more transformation ds1 => data

// this is filtering try

filteredData = data.filter($"event_time" > (current_timestamp() - expr("INTERVAL 1 hours")))
results = filteredData.groupBy($"customer").agg(min("price").alias("min_price"))

//this is sliding window
results = filteredData.groupBy(window($"event_time", "1 hours", "5 minutes"),$"customer")
                      .agg(min("price").alias("min_price"))

出于测试目的,我正在向console写信
这在spark结构化流媒体中可行吗?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题