pyspark 使用InsertAll()/ UpdateAll()和EXCEPT关键字进行增量合并

mrphzbgm  于 4个月前  发布在  Spark
关注(0)|答案(1)|浏览(48)

我想在两个Delta表上使用合并操作,但我不想编写复杂的插入/更新条件,所以理想情况下我想使用InsertAll()和UpdateAll()。它工作得很好,但我的源表包含一个额外的列,我不想填充到目标数据中,但我不能删除它,因为我需要它来识别要删除的记录。
我在文档中发现有一个EXCEPT关键字,我可以使用。有一个用SQL编写的示例:

MERGE INTO target t
USING source s
ON t.id = s.id
WHEN MATCHED
  THEN UPDATE SET last_updated = current_date()
WHEN NOT MATCHED
  THEN INSERT * EXCEPT (last_updated)

字符串
有没有办法在PySpark中使用相同的方法?
我知道我可以编写自定义的插入和更新条件,但这会导致使用InsertAll / UpdateAll()时不会发生的模式演化问题

u5i3ibmn

u5i3ibmn1#

不幸的是,PySpark中还没有这样的功能。但它确实是以编程方式实现的,类似于以下内容:

from pyspark.sql.functions import *

update_df_name = "updates"
excluded_columsn = ["col1", "col2"]
values_to_insert = dict([(cl, col(f"{update_df_name}.{cl}")) 
  for cl in df.columns if cl not in excluded_columnes])

deltaTable.alias("events").merge(
    source = updatesDF.alias(update_df_name),
    condition = expr("events.eventId = updates.eventId")
  ).whenNotMatchedInsert(
     values = values_to_insert
  ).execute()

字符串

相关问题