python—使用特定条件合并两个df以形成一个df

ruyhziif  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(485)

在spark python中,我必须合并两个具有相同列名和类型的Dataframe:合并Dataframe时,存在几个条件:
如果 UPDATE_TYPE 是否更新忽略该行中除 DISPOSITION 以及 TIME . 对于处置,使用该行中处置的值(更新类型为update)。对于时间,使用两行中较大的时间值
如果 UPDATE_TYPE 不是 UPDATE 保留表2中除时间以外的所有值。为了时间,使用更大的时间价值。
我已经做了,但使用的是reducebykey。这是一个非常缓慢的解决方案。我可以直接使用df吗?

df1
ID      UPDATE_TYPE TIME DISPOSITION ROG
1       SEGMENT     1000 null        Q
2       SEGMENT     1001 value       W
3       SEGMENT     1002 null        E
3       UPDATE      1004 some_value  A
4       SEGMENT     1003 null        R
5       SEGMENT     1004 value_old   T  
7       SEGMENT     1050 value       U

df2
ID      UPDATE_TYPE TIME DISPOSITION ROG
4       SEGMENT     1003 value        P1
5       UPDATE      1015 value_new    P2
6       SEGMENT     1010 value        P3
Final output
df_output
ID      UPDATE_TYPE TIME DISPOSITION ROG
1       SEGMENT     1000 null        Q
2       SEGMENT     1001 value       W
3       SEGMENT     1004 some_value  E
4       SEGMENT     1003 null        P1
5       SEGMENT     1015 value_new   T  
6       SEGMENT     1010 value       P3
7       SEGMENT     1050 value       U

编辑它会发现表1本身中可能存在重复的ID。这些重复的id不能出现在最终输出中

uqjltbpv

uqjltbpv1#

您可以尝试spark sql:

SELECT 
DF1.ID,
DF1.UPDATE_TYPE,
CASE WHEN DF1.TIME > DF2.TIME THEN DF1.TIME ELSE DF2.TIME END AS TIME,
CASE WHEN DF2.UPDATE_TYPE='SEGMENT' THEN DF1.DISPOSITION ELSE DF2.DISPOSITION END AS DISPOSITION,
CASE WHEN DF2.UPDATE_TYPE='SEGMENT' THEN DF2.ROG ELSE DF1.ROG END AS ROG
FROM 
DF1 LEFT JOIN DF2 ON DF1.ID = DF2.ID

相关问题