如何执行从pyspark框架到azure sql数据库表的upsert(插入+更新)?

xqkwcwgp  于 4个月前  发布在  Spark
关注(0)|答案(1)|浏览(47)

我正在尝试做一个从pyspark数组到sql表的upsert。
sparkdf是我的pyspark框架。Test是我在azure sql数据库中的sql表。
到目前为止,我有以下内容:

def write_to_sqldatabase(final_table, target_table):
    #Write table data into a spark dataframe
    final_table.write.format("jdbc") \
        .option("url", f"jdbc:sqlserver://{SERVER};databaseName={DATABASE}") \
        .option("dbtable", f'....{target_table}') \
        .option("user", USERNAME) \
        .option("password", PASSWORD) \
        .mode("append") \
        .save()

字符串

spark.sql("""
merge target t
using source s
on s.Id = t.Id
when matched then 
update set *
when not matched then insert *
""")


jdbc_url = f"jdbc:sqlserver://{SERVER};database={DATABASE};user={USERNAME};password={PASSWORD}"
sparkdf.createOrReplaceTempView('source')
df = spark.read \
    .format("jdbc") \
    .option("url", jdbc_url) \
    .option("dbtable", "(merge into target t using source s on s.Id = t.Id when matched then  update set * when not matched then insert *) AS subquery") \
    .load()


后者不起作用,因为azure sql server似乎不支持 *。我想你必须声明列和值而不使用 *。但是我想动态地这样做,因为我有很多列和很多不同的表,我想为它们做upsert。
我尝试了不同的选择,但到目前为止没有任何效果。

yiytaume

yiytaume1#

根据这个https://issues.apache.org/jira/browse/SPARK-19335 Spark框架编写器API没有这样的jdbc功能。必须推出自己的解决方案。

相关问题