使用spark和scala过滤dataframe上的两个时间戳

nc1teljy  于 12个月前  发布在  Scala
关注(0)|答案(1)|浏览(80)

我使用Scala“2.12.10”和Spark“3.4.0”。
我使用dataframe读取Postgres表,我的dataframe看起来像这样:
| 用户ID|金额|日期|
| - -----|- -----|- -----|
| 1|十点零二分|2023-01-28 19:22:59.266508|
| 1| 2.02| 2023-01-28 20:22:59.266508|
| 1| 5个|2023-02-28 12:21:34.466508|
| 2|十八点三十二分|2019 -01-18 01:34:01.222408|
我想从dataframe中过滤每一行,并在发生以下两种情况时累积数量(总和):如果注册表属于同一个用户,并且日期在之前,则生成一个新的数据集,如下所示:
| 用户ID|金额|日期|累计量|
| - -----|- -----|- -----|- -----|
| 1|十点零二分|2023-01-28 2023-01-28 2023-01-28|12.04|
| 1| 5个|2023-02-28 2023-02-28 2023-02-28|17.04|
| 2|十八点三十二分|2023-01-18 2023-01-18 2023-01-18|十八点三十二分|

注意第一行的结果与同一天累计(累计两个第一行)
注意第二行符合要求(前3行累计)

我该如何做到这一点?谢谢!

ldioqlga

ldioqlga1#

您可以创建一个按userID分区并按日期排序的窗口,确保groupBy并按日期对所有金额求和:

val resDf = df.withColumn("date", to_date(col("date"))).groupBy("date").agg(first("userID").alias("userID"),
  first("amount").alias("amount"),
  sum("amount").alias("acummulated_amount"))
  .withColumn("acummulated_amount", sum("acummulated_amount").over(Window.partitionBy("userID").orderBy("date")))
resDf.show()

结果:

+----------+------+------+------------------+
|      date|userID|amount|acummulated_amount|
+----------+------+------+------------------+
|2023-01-28|     1| 10.02|             12.04|
|2023-02-28|     1|   5.0|             17.04|
|2023-01-18|     2| 18.32|             18.32|
+----------+------+------+------------------+

相关问题