我想比较两个具有相同列的dfs,我只需要在df 1中添加一列last_change_date,它将通过df 2 last_change_date添加。
示例框:-
data_df = [("John", 25, "Male", "Engineer", 1),
("Alice", 30, "Female", "Doctor", 2)]
data_df2 = [("1","John", 25, "Male", "Engineer", "2023-01-01"),
("2","Alice", 30, "Female", "Doctor", "2023-01-02")]
字符串
我正在尝试这个代码:
def compare_dataframes(df1, df2, key_fields):
df1_aliases = [col(f"{field}").alias(f"{field}_df1") for field in df1.columns if field not in key_fields]
df2_aliases = [col(f"{field}").alias(f"{field}_df2") for field in df2.columns if field not in key_fields]
join_conditions = [col(f"{field}_df1") == col(f"{field}_df2") for field in key_fields]
result_df = df1.select(*df1.columns, *df1_aliases).alias("df1").join(
df2.select(*df2.columns, *df2_aliases).alias("df2"),
on=join_conditions,
how="inner"
)
for field in df1.columns:
if field not in key_fields:
result_df = result_df.withColumn(
f"last_change_date_{field}",
when(
(col(f"{field}_df1") != col(f"{field}_df2")) | col(f"last_change_date_{field}_df1").isNull(),
current_timestamp()
).otherwise(col(f"last_change_date_{field}_df2"))
)
return result_df
型
但我无法通过字段进行验证,我尝试使用别名,但它给出了错误:-
[UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `Key_df1` cannot be resolved. Did you mean one of the following? [`df1`.`Age`, `df2`.`Age`, `df1`.`Age_df1`, `df1`.`Key`, `df2`.`Key`].;
'Join Inner, ('Key_df1 = 'Key_df2)
:- SubqueryAlias df1
: +- Project [Name#6424, Age#6425L, Key#6428L, Name#6424 AS Name_df1#6483, Age#6425L AS Age_df1#6484L]
: +- Project [Name#6424, Age#6425L, Key#6428L]
: +- LogicalRDD [Name#6424, Age#6425L, Gender#6426, Occupation#6427, Key#6428L], false
+- SubqueryAlias df2
+- Project [Key#6434, Name#6435, Age#6436L, last_change_date#6439, Name#6435 AS Name_df2#6485, Age#6436L AS Age_df2#6486L, last_change_date#6439 AS last_change_date_df2#6487]
+- Project [Key#6434, Name#6435, Age#6436L, last_change_date#6439]
+- LogicalRDD [Key#6434, Name#6435, Age#6436L, Gender#6437, Occupation#6438, last_change_date#6439], false.
型
我怎样才能得到想要的结果。
2条答案
按热度按时间uplii1fm1#
请看下面的答案是否有帮助。你可以在合并的df上运行这个列式迭代。简而言之,你不需要附加df1和df2别名。你可以直接用它们的父df引用它们。
字符串
yxyvkwin2#
如果你只是想在DF1中添加一个列,你可以简单地使用连接并更新DF来添加该列。这比迭代值并添加它要好得多。
字符串
结果
型