ApacheSpark—当数据组中的一行满足条件时,对该组进行筛选

vawmfj5a  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(350)

关闭。这个问题需要细节或清晰。它目前不接受答案。
**想改进这个问题吗?**通过编辑这个帖子来添加细节并澄清问题。

三个月前关门了。
改进这个问题
我想删除数据中的组(按id列分组),如果组中没有特定月份(2017-01-01),则可以是任何月份。

+-----+----------+------------------+
|   id|     month|             price|
+-----+----------+------------------+
|  abc|2017-01-01|               1.0|<--
|  abc|2017-02-01|               1.0|
|  abc|2017-03-01|0.9933874274883838|
|  abc|2017-04-01|0.9886929385353734|
|  abc|2017-05-01|1.0665978253021122|
|  abc|2017-06-01|1.0314266473278149|
|  bcd|2017-02-01|1.0463297958471622|<-- no entry for 2017-01-01, filter this group out
|  bcd|2017-03-01|1.0712818805540645|
|  bcd|2017-04-01|0.9622054745273114|
|  bcd|2017-05-01|1.0410563146608105|
|  bcd|2017-06-01|               1.0|
|  cde|2017-01-01|               1.0|<--
|  cde|2017-02-01|  0.95786111804302|
|  cde|2017-03-01|  0.99786744034189|
|  cde|2017-04-01|0.9943516278425732|
|  cde|2017-05-01|0.9770065053504754|
|  cde|2017-06-01| 1.006149042146841|
+-----+----------+------------------+

我已经做了一个窗口功能,所以寻找一个更好的替代如何做到这一点。因为对于一个简单/常见的任务来说,这似乎是一个很长的解决方法。

df.withColumn('cond', sum(when('month' == '2017-01-01', 1)).over(Window.partitionBy('id'))

df.filter(col('cond') > 1)
Expected output:
+-----+----------+------------------+
|   id|     month|             price|
+-----+----------+------------------+
|  abc|2017-01-01|               1.0|
|  abc|2017-02-01|               1.0|
|  abc|2017-03-01|0.9933874274883838|
|  abc|2017-04-01|0.9886929385353734|
|  abc|2017-05-01|1.0665978253021122|
|  abc|2017-06-01|1.0314266473278149|
|  cde|2017-01-01|               1.0|
|  cde|2017-02-01|  0.95786111804302|
|  cde|2017-03-01|  0.99786744034189|
|  cde|2017-04-01|0.9943516278425732|
|  cde|2017-05-01|0.9770065053504754|
|  cde|2017-06-01| 1.006149042146841|
+-----+----------+------------------+
sirbozc5

sirbozc51#

我会用min而不是sum。

df = df.withColumn("base_month", F.min("month").over(Window.partitionBy('id'))
df = df.where("base_month = '2017-01-01'").drop("base_month")

编辑:另一种方式!

df = df.withColumn("month_list", F.collect_set("month").over(Window.partitionBy('id'))
df = df.where("array_contains(month_list, '2017-01-01'").drop("month_list")

相关问题