将字符串值与spark dataframe列进行比较,并根据条件更新字符串

1zmg4dgp  于 2021-07-14  发布在  Spark
关注(0)|答案(0)|浏览(107)

我有一个输入Dataframe id , group & date 领域。我得比较一下 input_df 如果输入df有新日期的大部分记录,则更新字符串中的日期。

val lastRunDtDetails = "A#2021-04-02,B#2021-04-01,C#2021-04-02"

val input_df = sc.parallelize(Seq((1,"A","2021-04-01"),(2,"A","2021-04-02"),(3,"A","2021-04- 
02"),(4,"A","2021-04-02"),(5,"A","2021-04-03"),(6,"B","2021-04-01"),(7,"B","2021-04-02"), 
(8,"B","2021-04-02"),(9,"B","2021-04-02"),(10,"B","2021-04-02"),(11,"C","2021-04-01"), 
(12,"C","2021-04-01"),(13,"C","2021-04-01"),(14,"C","2021-04-02"),(15,"C","2021-04- 
03"))).toDF("id","group","date")

input_df.show()
+---+-----+----------+
| id|group|      date|
+---+-----+----------+
|  1|    A|2021-04-01|
|  2|    A|2021-04-02|
|  3|    A|2021-04-02|
|  4|    A|2021-04-02|
|  5|    A|2021-04-03|
|  6|    B|2021-04-01|
|  7|    B|2021-04-02|
|  8|    B|2021-04-02|
|  9|    B|2021-04-02|
| 10|    B|2021-04-02|
| 11|    C|2021-04-01|
| 12|    C|2021-04-01|
| 13|    C|2021-04-01|
| 14|    C|2021-04-02|
| 15|    C|2021-04-03|
+---+-----+----------+

val input_df_count = input_df.groupBy("group").count.orderBy("group")
input_df_count: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [group: string, count: bigint]    

input_df_count.show()
+-----+-----+
|group|count|
+-----+-----+
|    A|    5|
|    B|    5|
|    C|    5|
+-----+-----+

val max_dt_count_df = input_df.groupBy("group", "date").count().groupBy("group").agg(max(struct("count", "date")) as "max_dt").select($"group", $"max_dt.date",$"max_dt.count" as "max_count").orderBy("group")
max_dt_count_df: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [group: string, date: string ... 1 more field]    

max_dt_count_df.show()
+-----+----------+---------+
|group|      date|max_count|
+-----+----------+---------+
|    A|2021-04-02|        3|
|    B|2021-04-02|        4|
|    C|2021-04-01|        3|
+-----+----------+---------+

val percent_df= input_df_count.join(max_dt_count_df, input_df_count("group") === max_dt_count_df("group"), "inner").select(input_df_count("*"), max_dt_count_df("date"), max_dt_count_df("max_count")).withColumn("percentile", ($"max_count"/$"count")*100).orderBy("group")
2021-04-07 16:40:33 WARN  Column:66 - Constructing trivially true equals predicate, 'group#23 = group#23'. Perhaps you need to use aliases.
percent_df: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [group: string, count: bigint ... 3 more fields]    

percent_df.show()
+-----+-----+----------+---------+----------+
|group|count|      date|max_count|percentile|
+-----+-----+----------+---------+----------+
|    A|    5|2021-04-02|        3|      60.0|
|    B|    5|2021-04-02|        4|      80.0|
|    C|    5|2021-04-01|        3|      50.0|
+-----+-----+----------+---------+----------+

现在我想比较一下 percent_df 使用输入字符串中的日期,如果来自df的日期大于来自字符串的日期,并且来自df的百分比大于75%,则使用来自df的日期更新字符串。因此,从上面的输入来看,我期望的字符串应该如下所示:

val newDatesDetails = "A#2021-04-02,B#2021-04-02,C#2021-04-02"

我可以通过将输入字符串转换为df来实现这一点。

val last_run_details_df = sc.parallelize(lastRunRptgDtDetails.split(",").map(_.split("#")).map{ case Array(a,b) => (a, b) }).toDF("group", "previous_date")
last_run_details_df: org.apache.spark.sql.DataFrame = [group: string, previous_date: string]    

last_run_details_df.show()
+-----+-------------+
|group|previous_date|
+-----+-------------+
|    A|   2021-04-02|
|    B|   2021-04-01|
|    C|   2021-04-02|
+-----+-------------+

val temp_df = percent_df.join(last_run_details_df, percent_df("group") === last_run_details_df("group"), "inner").select(percent_df("*"), last_run_details_df("previous_date")).orderBy("group")    

temp_df.withColumn("new_dates", when($"date" >= $"previous_date" && $"percentile" >= 75, $"date").otherwise($"previous_date")).show()     

val newDatesDetails = temp_df.withColumn("new_dates", when($"date" >= $"previous_date" && $"percentile" >= 75, $"date").otherwise($"previous_date")).select(concat(col("group"),lit("#"),col("new_dates")) as "new_dates").collect.mkString(",").replaceAll("\\[|\\]","")

new_dates
res49: String = A#2021-04-02,B#2021-04-02,C#2021-04-02

我认为这不是获得新日期详细信息的理想方法。有没有更好的方法来推导上面这样的最终字符串?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题