当条件为true时应用scala窗口函数,否则用最后一个值填充

tzcvj98z  于 2021-05-24  发布在  Spark
关注(0)|答案(1)|浏览(360)

给定一组不同电子邮件ID的事务。例如:

val df = Seq(
      ("a@gmail.com", "2020-10-01 01:04:00", "txid-0", false),
      ("a@gmail.com", "2020-10-02 01:04:00", "txid-1", true),
      ("a@gmail.com", "2020-10-02 02:04:00", "txid-2", false),
      ("a@gmail.com", "2020-10-02 03:04:00", "txid-3", true),
      ("a@gmail.com", "2020-10-02 04:04:00", "txid-4", false),
      ("a@gmail.com", "2020-10-02 04:05:00", "txid-5", false),
      ("a@gmail.com", "2020-10-02 05:04:00", "txid-6", true),
      ("a@gmail.com", "2020-10-05 12:04:00", "txid-7", true),
      ("b@gmail.com", "2020-12-03 03:04:00", "txid-8", true),
      ("c@gmail.com", "2020-12-04 06:04:00", "txid-9", true)
    ).toDF("email", "timestamp", "transaction_id", "condition")

我想得到的是过去24小时内按电子邮件分组的交易数量 condition 这是真的。如果 condition 是假的,我只想 count 列,以包含 condition 这是真的。以上结果如下:

val expectedDF = Seq(
  ("a@gmail.com", "2020-10-01 01:04:00", "txid-0", false, 0),
  ("a@gmail.com", "2020-10-02 01:04:00", "txid-1", true, 1),
  ("a@gmail.com", "2020-10-02 02:04:00", "txid-2", false, 1),// copy last count since condition is false
  ("a@gmail.com", "2020-10-02 03:04:00", "txid-3", true, 2),
  ("a@gmail.com", "2020-10-02 04:04:00", "txid-4", false, 2),// copy last count since condition is false
  ("a@gmail.com", "2020-10-02 04:05:00", "txid-5", false, 2),// copy last count since condition is false
  ("a@gmail.com", "2020-10-02 05:04:00", "txid-6", true, 3),
  ("a@gmail.com", "2020-10-05 12:04:00", "txid-7", true, 1), // beyond 24 hrs from prev transaction
  ("b@gmail.com", "2020-12-03 03:04:00", "txid-8", true, 1), // new email
  ("c@gmail.com", "2020-12-04 06:04:00", "txid-9", true, 1) // new email
).toDF("email", "timestamp", "transaction_id", "condition", "count")

到目前为止我所做的是:

val new_df = df
      .withColumn("transaction_timestamp", unix_timestamp($"timestamp").cast(LongType))

    val winSpec = Window
      .partitionBy("email")
      .orderBy(col("transaction_timestamp"))
      .rangeBetween(-24*3600, Window.currentRow)

    val resultDF = new_df
      .filter(col("condition"))
      .withColumn("count", count(col("email")).over(winSpec))

    resultDF.show()

它打印的是没有带 condition ==false条件,但我希望所有行都具有正确的计数值,如 expectedDF :

("email",      | "timestamp"         | "transaction_id" | "condition" | "count")
("a@gmail.com", "2020-10-02 01:04:00", "txid-1",           true,            1),
("a@gmail.com", "2020-10-02 03:04:00", "txid-3",           true,            2),
("a@gmail.com", "2020-10-02 05:04:00", "txid-6",           true,            3),
("a@gmail.com", "2020-10-05 12:04:00", "txid-7",           true,            1),
("b@gmail.com", "2020-12-03 03:04:00", "txid-8",           true,            1),
("c@gmail.com", "2020-12-04 06:04:00", "txid-9",           true,            1)

我找不到一种方法来应用窗口函数,它只在条件为真时计算,否则在条件为真时复制上一个好值。任何帮助都将不胜感激。

zpjtge22

zpjtge221#

不过滤,只使用 when .

val resultDF = new_df
  .withColumn("count", count(when(col("condition"), col("email"))).over(winSpec))

resultDF.show()

+-----------+-------------------+--------------+---------+---------------------+-----+
|      email|          timestamp|transaction_id|condition|transaction_timestamp|count|
+-----------+-------------------+--------------+---------+---------------------+-----+
|a@gmail.com|2020-10-01 01:04:00|        txid-0|    false|         1.60151424E9|    0|
|a@gmail.com|2020-10-02 01:04:00|        txid-1|     true|         1.60160064E9|    1|
|a@gmail.com|2020-10-02 02:04:00|        txid-2|    false|         1.60160424E9|    1|
|a@gmail.com|2020-10-02 03:04:00|        txid-3|     true|         1.60160784E9|    2|
|a@gmail.com|2020-10-02 04:04:00|        txid-4|    false|         1.60161144E9|    2|
|a@gmail.com|2020-10-02 04:05:00|        txid-5|    false|          1.6016115E9|    2|
|a@gmail.com|2020-10-02 05:04:00|        txid-6|     true|         1.60161504E9|    3|
|a@gmail.com|2020-10-05 12:04:00|        txid-7|     true|         1.60189944E9|    1|
|c@gmail.com|2020-12-04 06:04:00|        txid-9|     true|         1.60706184E9|    1|
|b@gmail.com|2020-12-03 03:04:00|        txid-8|     true|         1.60696464E9|    1|
+-----------+-------------------+--------------+---------+---------------------+-----+

相关问题