如何在scala中基于id和status=“removed”动态删除行

aydmsdu9  于 2021-05-27  发布在  Spark
关注(0)|答案(2)|浏览(393)

这里我有一些示例数据集,以及如何根据(column)status=“removed”动态删除id(没有硬编码值)。
示例数据集:

ID  Status       Date      Amount
1   New         01/05/20    20
1   Assigned    02/05/20    30
1   In-Progress 02/05/20    50
2   New         02/05/20    30
2   Removed     03/05/20    20
3   In-Progress 09/05/20    50
3   Removed     09/05/20    20
4   New         10/05/20    20
4   Assigned    10/05/20    30

期望result:-

ID  Status       Date      Amount
1   New         01/05/20    20
1   Assigned    02/05/20    30
1   In-Progress 02/05/20    50
4   New         10/05/20    20
4   Assigned    10/05/20    30

提前谢谢。

y53ybaqx

y53ybaqx1#

你可以用任何一个 filter , not like/rlikefilter out 来自Dataframe的记录 status = removed .

import org.apche.spark.sql.functions._

//assuming df is the dataframe
//using filter or where clause, trim to remove white spaces lower to convert to lower
val df1=df.filter(lower(trim(col("status"))) !== "removed")

//or by filtering status Removed filter won't match if you have mixed case
val df1=df.filter(col("status") !== "Removed")

//using not like
val df1=df.filter(!lower(col("status")).like("removed"))

//using not rlike
val df1=df.filter(!col("status").rlike(".*(?i)removed.*"))

现在,df1Dataframe将包含所需的记录。
UPDATE: From Spark2.4: 对于这种情况,我们可以使用join或window子句。

val df=Seq((1,"New","01/05/20","20"),(1,"Assigned","02/05/20","30"),(1,"In-Progress","02/05/20","50"),(2,"New","02/05/20","30"),(2,"Removed","03/05/20","20"),(3,"In-Progress","09/05/20","50"),(3,"Removed","09/05/20","20"),(4,"New","10/05/20","20"),(4,"Assigned","10/05/20","30")).toDF("ID","Status","Date","Amount")

import org.apache.spark.sql.expressions._

val df1=df.
groupBy("id").
agg(collect_list(lower(col("Status"))).alias("status_arr"))

//using array_contains function
df.alias("t1").join(df1.alias("t2"),Seq("id"),"inner").
filter(!array_contains(col("status_arr"),"removed")).
drop("status_arr").show()

//without join using window clause
val w=Window.partitionBy("id").orderBy("Status").rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
df.withColumn("status_arr",collect_list(lower(col("status"))).over(w)).
filter(!array_contains(col("status_arr"),"removed")).
drop("status_arr").
show()

//+---+-----------+--------+------+
//| ID|     Status|    Date|Amount|
//+---+-----------+--------+------+
//|  1|        New|01/05/20|    20|
//|  1|   Assigned|02/05/20|    30|
//|  1|In-Progress|02/05/20|    50|
//|  4|        New|10/05/20|    20|
//|  4|   Assigned|10/05/20|    30|
//+---+-----------+--------+------+
``` `For Spark < 2.4:` ```
val df1=df.groupBy("id").agg(concat_ws("",collect_list(lower(col("Status")))).alias("status_arr"))

df.alias("t1").join(df1.alias("t2"),Seq("id"),"inner").
filter(!col("status_arr").contains("removed")).
drop(col("status_arr")).
show()

//Using window functions
df.withColumn("status_arr",concat_ws("",collect_list(lower(col("status"))).over(w))).
filter(!col("status_arr").contains("removed")).
drop(col("status_arr")).
show(false)
//+---+-----------+--------+------+
//| ID|     Status|    Date|Amount|
//+---+-----------+--------+------+
//|  1|        New|01/05/20|    20|
//|  1|   Assigned|02/05/20|    30|
//|  1|In-Progress|02/05/20|    50|
//|  4|        New|10/05/20|    20|
//|  4|   Assigned|10/05/20|    30|
//+---+-----------+--------+------+
30byixjq

30byixjq2#

假设 res0 是您的数据集,您可以:

import spark.implicits._
val x = res0.where($"Status" !== "Removed")
x.show()

这将删除状态为removed的行,但不会根据上面发布的内容给出您想要实现的目标。

相关问题