给定一组不同电子邮件ID的事务。例如:
val df = Seq(
("a@gmail.com", "2020-10-01 01:04:00", "txid-0", false),
("a@gmail.com", "2020-10-02 01:04:00", "txid-1", true),
("a@gmail.com", "2020-10-02 02:04:00", "txid-2", false),
("a@gmail.com", "2020-10-02 03:04:00", "txid-3", true),
("a@gmail.com", "2020-10-02 04:04:00", "txid-4", false),
("a@gmail.com", "2020-10-02 04:05:00", "txid-5", false),
("a@gmail.com", "2020-10-02 05:04:00", "txid-6", true),
("a@gmail.com", "2020-10-05 12:04:00", "txid-7", true),
("b@gmail.com", "2020-12-03 03:04:00", "txid-8", true),
("c@gmail.com", "2020-12-04 06:04:00", "txid-9", true)
).toDF("email", "timestamp", "transaction_id", "condition")
我想得到的是过去24小时内按电子邮件分组的交易数量 condition
这是真的。如果 condition
是假的,我只想 count
列,以包含 condition
这是真的。以上结果如下:
val expectedDF = Seq(
("a@gmail.com", "2020-10-01 01:04:00", "txid-0", false, 0),
("a@gmail.com", "2020-10-02 01:04:00", "txid-1", true, 1),
("a@gmail.com", "2020-10-02 02:04:00", "txid-2", false, 1),// copy last count since condition is false
("a@gmail.com", "2020-10-02 03:04:00", "txid-3", true, 2),
("a@gmail.com", "2020-10-02 04:04:00", "txid-4", false, 2),// copy last count since condition is false
("a@gmail.com", "2020-10-02 04:05:00", "txid-5", false, 2),// copy last count since condition is false
("a@gmail.com", "2020-10-02 05:04:00", "txid-6", true, 3),
("a@gmail.com", "2020-10-05 12:04:00", "txid-7", true, 1), // beyond 24 hrs from prev transaction
("b@gmail.com", "2020-12-03 03:04:00", "txid-8", true, 1), // new email
("c@gmail.com", "2020-12-04 06:04:00", "txid-9", true, 1) // new email
).toDF("email", "timestamp", "transaction_id", "condition", "count")
到目前为止我所做的是:
val new_df = df
.withColumn("transaction_timestamp", unix_timestamp($"timestamp").cast(LongType))
val winSpec = Window
.partitionBy("email")
.orderBy(col("transaction_timestamp"))
.rangeBetween(-24*3600, Window.currentRow)
val resultDF = new_df
.filter(col("condition"))
.withColumn("count", count(col("email")).over(winSpec))
resultDF.show()
它打印的是没有带 condition
==false条件,但我希望所有行都具有正确的计数值,如 expectedDF
:
("email", | "timestamp" | "transaction_id" | "condition" | "count")
("a@gmail.com", "2020-10-02 01:04:00", "txid-1", true, 1),
("a@gmail.com", "2020-10-02 03:04:00", "txid-3", true, 2),
("a@gmail.com", "2020-10-02 05:04:00", "txid-6", true, 3),
("a@gmail.com", "2020-10-05 12:04:00", "txid-7", true, 1),
("b@gmail.com", "2020-12-03 03:04:00", "txid-8", true, 1),
("c@gmail.com", "2020-12-04 06:04:00", "txid-9", true, 1)
我找不到一种方法来应用窗口函数,它只在条件为真时计算,否则在条件为真时复制上一个好值。任何帮助都将不胜感激。
1条答案
按热度按时间zpjtge221#
不过滤,只使用
when
.