pyspark 我想比较两个具有相同列的点阵

w6mmgewl  于 4个月前  发布在  Spark
关注(0)|答案(2)|浏览(58)

我想比较两个具有相同列的dfs,我只需要在df 1中添加一列last_change_date,它将通过df 2 last_change_date添加。
示例框:-

data_df = [("John", 25, "Male", "Engineer", 1),
           ("Alice", 30, "Female", "Doctor", 2)]

data_df2 = [("1","John", 25, "Male", "Engineer", "2023-01-01"),
              ("2","Alice", 30, "Female", "Doctor", "2023-01-02")]

字符串
我正在尝试这个代码:

def compare_dataframes(df1, df2, key_fields):
    df1_aliases = [col(f"{field}").alias(f"{field}_df1") for field in df1.columns if field not in key_fields]
    df2_aliases = [col(f"{field}").alias(f"{field}_df2") for field in df2.columns if field not in key_fields]
    join_conditions = [col(f"{field}_df1") == col(f"{field}_df2") for field in key_fields]
    result_df = df1.select(*df1.columns, *df1_aliases).alias("df1").join(
        df2.select(*df2.columns, *df2_aliases).alias("df2"),
        on=join_conditions,
        how="inner"
    )
    for field in df1.columns:
        if field not in key_fields:
            result_df = result_df.withColumn(
                f"last_change_date_{field}",
                when(
                    (col(f"{field}_df1") != col(f"{field}_df2")) | col(f"last_change_date_{field}_df1").isNull(),
                    current_timestamp()
                ).otherwise(col(f"last_change_date_{field}_df2"))
            )

    return result_df


但我无法通过字段进行验证,我尝试使用别名,但它给出了错误:-

[UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `Key_df1` cannot be resolved. Did you mean one of the following? [`df1`.`Age`, `df2`.`Age`, `df1`.`Age_df1`, `df1`.`Key`, `df2`.`Key`].;
'Join Inner, ('Key_df1 = 'Key_df2)
:- SubqueryAlias df1
:  +- Project [Name#6424, Age#6425L, Key#6428L, Name#6424 AS Name_df1#6483, Age#6425L AS Age_df1#6484L]
:     +- Project [Name#6424, Age#6425L, Key#6428L]
:        +- LogicalRDD [Name#6424, Age#6425L, Gender#6426, Occupation#6427, Key#6428L], false
+- SubqueryAlias df2
   +- Project [Key#6434, Name#6435, Age#6436L, last_change_date#6439, Name#6435 AS Name_df2#6485, Age#6436L AS Age_df2#6486L, last_change_date#6439 AS last_change_date_df2#6487]
      +- Project [Key#6434, Name#6435, Age#6436L, last_change_date#6439]
         +- LogicalRDD [Key#6434, Name#6435, Age#6436L, Gender#6437, Occupation#6438, last_change_date#6439], false.


我怎样才能得到想要的结果。

uplii1fm

uplii1fm1#

请看下面的答案是否有帮助。你可以在合并的df上运行这个列式迭代。简而言之,你不需要附加df1和df2别名。你可以直接用它们的父df引用它们。

non_key_fields=[x for x in df1.columns if x  not in key_fields]
for cl in non_key_fields:
  final_df=final_df.withColumn("last_change_"+cl,when(((df1[cl]!=df2[cl])|(df1['last_change_date'].isNull())),current_timestamp()).otherwise(df2['last_change_date']))

字符串

yxyvkwin

yxyvkwin2#

如果你只是想在DF1中添加一个列,你可以简单地使用连接并更新DF来添加该列。这比迭代值并添加它要好得多。

from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Create SparkSession
spark = SparkSession.builder.appName("DataFrameComparison").getOrCreate()

# Sample data
data_df = [("John", 25, "Male", "Engineer", 1),
           ("Alice", 30, "Female", "Doctor", 2)]

data_df2 = [("1", "John", 25, "Male", "Engineer", "2023-01-01"),
            ("2", "Alice", 30, "Female", "Doctor", "2023-01-02")]

# Create DataFrames
df1 = spark.createDataFrame(data_df, ["name", "age", "gender", "profession", "id"])
df2 = spark.createDataFrame(data_df2, ["id", "name", "age", "gender", "profession", "last_change_date"])

# Join the DataFrames on common columns
df1 = df1.join(df2, (df1.id == df2.id) & (df1.name == df2.name)).select(df1["*"],df2["last_change_date"])

df1.show()

字符串

结果

+-----+---+------+----------+---+----------------+
| name|age|gender|profession| id|last_change_date|
+-----+---+------+----------+---+----------------+
| John| 25|  Male|  Engineer|  1|      2023-01-01|
|Alice| 30|Female|    Doctor|  2|      2023-01-02|
+-----+---+------+----------+---+----------------+

相关问题