PySpark实现了非常大的框架

s71maibg  于 5个月前  发布在  Spark
关注(0)|答案(1)|浏览(41)

我需要从两个配置单元表中提取数据,这两个配置单元表非常大。它们在两个不同的模式中,但具有相同的定义。
我需要比较这两个表,并在PySpark中确定以下内容
1.表1中存在但表2中缺少的行
1.两个表中都存在的行,但任何非键列中的值不匹配
1.表2中存在但表1中缺少的行
例如,假设该表具有以下参数

ProductId - BigInteger - PK
ProductVersion - int - PK
ProductName - char
ProductPrice - decimal
ProductDesc - varchar

字符串
假设数据如下

Table1 in Schema1
[1, 1, "T-Shirt", 10.50, "Soft-Washed Slub-Knit V-Neck"] -> Matches with Table2
[1, 2, "T-Shirt", 10.50, "Soft-Washed Striped Crew-Neck "] -> Price is different in Table1
[2, 1, "Short Sleeve Shirt", 10.50, "Everyday Printed Short-Sleeve Shirt"] -> Missing in Table2
[3, 1, "T-Shirt", 10.50, "Breathe ON Camo Tee"] -> Prod Desc is different in Table2

Table2 in Schema2
[1, 1, "T-Shirt", 10.50, "Soft-Washed Slub-Knit V-Neck"]  -> Matches with Table1
[2, 1, "Short Sleeve Shirt", 12.50, "Everyday Printed Short-Sleeve Shirt"]  -> Price is different 
[3, 1, "T-Shirt", 10.50, "Breathe ON Camo"] -> Prod Desc is different in Table2
[3, 2, "T-Shirt", 20, "Breathe ON ColorBlock Tee"] -> Missing in Table1


预期结果将是三个独立的 Dataframe

  1. dfOut1-将包含表1中存在但基于主键在表2中缺失的行
["Missing in Table2", [1, 2, "T-Shirt", 10.50, "Soft-Washed Striped Crew-Neck "]]


第一列将指示差异类型,如果差异类型为"Missing in Table1""Missing in Table2",则源表中的整行都将可用。

  1. dfdiff -
["Difference", "ProductPrice", 2, 1, 10.50, 12.50]
["Difference", "ProductDesc", 3,1,  "Breathe ON Camo Tee",  "Breathe ON Camo"]

  1. dfout2 -
["Missing in Table1", [3, 2, "T-Shirt", 20, "Breathe ON ColorBlock Tee"]]


我在考虑采取以下方法

1. Create df1 from table1 using query "select * from schema1.table1"
2. Create df2 from table2 using query "select * from schema2.table2"
3. Use df1.except(df2)


我指的是documentation
我不确定这种方法是否有效。df1.except(df2)会比较所有字段,还是只比较键列?
此外,不确定如何进一步分离输出

6mzjoqzu

6mzjoqzu1#

您基本上是在尝试查找两个数据集之间的插入、更新和删除(增量)。

from pyspark.sql.functions import sha2, concat_ws

# getting the comma-separated keys to list
key_column_list = keys.split(',')
key_column_list= [x.strip().lower() for x in key_column_list]
#The column name of the chnage indicator column to be found
changeindicator="chg_id"
df_compare_curr_df = spark.sql("select * from table1")
df_compare_prev_df = spark.sql("select * from table2")
#getting columns List
currentcolumns = df_compare_curr_df.columns
previouscolumns = df_compare_curr_df.columns
#Creating Hash values so that this can generic for used for any kind of delta comparison
df_compare_curr_df = df_compare_curr_df.withColumn("all_hash_val", sha2(concat_ws("||", *currentcolumns), 256))
df_compare_curr_df = df_compare_curr_df.withColumn("key_val", sha2(concat_ws("||", *key_column_list), 256))   
df_compare_prev_df = df_compare_prev_df.withColumn("key_val", sha2(concat_ws("||", *key_column_list), 256))
df_compare_prev_df = df_compare_prev_df.withColumn("all_hash_val", sha2(concat_ws("||", *previouscolumns), 256))
df_compare_curr_df.createOrReplaceTempView("NewTable")
df_compare_prev_df.createOrReplaceTempView("OldTable")
#creating the sql for delta basically left and inner joins .
insert_sql = "select 'I' as " + changeindicator + ",A.* from NewTable A left outer join OldTable B on A.key_val = B.key_val where B.key_val is NULL"
update_sql = "select 'U' as " + changeindicator + ",A.* from NewTable A inner join OldTable B on A.key_val = B.key_val where A.all_hash_val != B.all_hash_val"
delete_sql = "select 'D' as  " + changeindicator + ",A.* from OldTable A left outer join NewTable B on A.key_val = B.key_val where B.key_val is NULL"
nochange_sql = "select 'N' as  " + changeindicator + ",A.* from OldTable A inner join NewTable B on A.key_val = B.key_val where A.all_hash_val = B.all_hash_val"
upsert_sql = insert_sql + " union " + update_sql
all_changes_sql = insert_sql + " union " + update_sql + " union " + delete_sql

df_compare_updates = spark.sql(update_sql)
df_compare_inserts = spark.sql(insert_sql)
df_compare_deletes = spark.sql(delete_sql)
df_compare_upserts = spark.sql(upsert_sql)
df_compare_changes = spark.sql(all_changes_sql)

字符串

相关问题