pyspark 冰山架构不合并缺失的列

o2rvlv0m  于 3个月前  发布在  Spark
关注(0)|答案(1)|浏览(51)

我正在AWS胶水作业中使用以下代码创建一个Iceberg表:

df.writeTo(f'glue_catalog.{DATABASE_NAME}.{TABLE_NAME}') \
    .using('iceberg') \
    .tableProperty("location", TABLE_LOCATION) \
    .tableProperty("write.spark.accept-any-schema", "true") \
    .tableProperty("format-version", "2") \
    .createOrReplace()

字符串
创建了表,我可以在Glue/LF中看到它,我可以在Athena中查询它。
我有另一个工作,正在尝试使用以下方法来upsert数据:

df_upsert.createOrReplaceTempView("upsert_items")
upsert_query = f"""
MERGE INTO glue_catalog.{DATABASE_NAME}.{TABLE_NAME} target
USING (SELECT * FROM upsert_items) updates
ON {join_condidtion}
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
"""
spark.sql(upsert_query)


The Ghosts Job fails and says:

AnalysisException: cannot resolve my_column in MERGE command given columns [updates.col1, updates.col2, ...


当列可能丢失或列可能增加时,我如何合并新数据。我认为Iceberg将通过为丢失/新列填充NULL来处理这个问题,因为我设置了“write.spark.accept-any-schema”= true。谢谢。
运行Spark版本3.3.0-amzn-1
AWS Gandroid Job v4
冰山v1.0.0

wwtsj6pe

wwtsj6pe1#

根据文件:
编写器必须启用mergeSchema选项。
第一个月
这在目前的spark.sql("MERGE ...")中是无法实现的。
有一个开放的“功能请求”issue来处理这个问题。
一个“非最佳”解决方案是检测是否在源中找到列而在目标中尚未找到,然后在MERGE语句之前执行do和ALTER TABLE target ADD COLUMN。️️🤷‍️🤷‍🤷‍

相关问题