使用Glue连接和Spark Scala覆盖Mysql表

4ngedf3f  于 8个月前  发布在  Scala
关注(0)|答案(2)|浏览(95)

是否可以使用预定义的连接在AWS胶水作业中执行预操作查询?或者如何使用glueContext.getJDBCSink覆盖mysql表中的数据?
我尝试执行的代码是

val datasink4 = glueContext.getJDBCSink(
catalogConnection = "xxxxx_mysql",
options = JsonOptions(
"""{"dbtable": "xxxxx.role_code_se", 
"database": "xxxxx", 
"preactions": "TRUNCATE TABLE xxxxx.role_code_se;", 
"overwrite": "true"}"""
), 
redshiftTmpDir = "", transformationContext = "datasink4"
).writeDynamicFrame(new_dynamic_frame)

但它不起作用。它忽略overwrite和truncate选项并抛出错误
java.sql.BatchUpdateException:关键字“ix_role_code_se_role_code”的重复条目“31”,位于

ubbxdtey

ubbxdtey1#

Glue只允许使用redshift的preaction和postaction,而不支持其他数据库。如果你想覆盖表,那么将dynamicframe转换为dynamicframe,然后使用如下方法:

df.write.option("truncate", "true").jdbc(url=DATABASE_URL, table=DATABASE_TABLE, mode="overwrite", properties=DATABASE_PROPERTIES)

参考this了解更多关于spark jdbc选项的信息,这是示例。

xsuvu9jc

xsuvu9jc2#

基于Prabhakar的回答(使用toDF()将DynamicFrame转换为Spark DataFrame),您可以获得Glue连接的JDBC选项,如下所示:

# This is the data that will be written to the sink
dynamic_frame = glueContext.create_data_frame.from_catalog(
    database="your_database_name",
    table_name="your_table_name",
)

# Obtain JDBC configurations for Glue connection
jdbc_conf = glueContext.extract_jdbc_conf(connection_name="your_connection_name")

# Convert the DynamicFrame to a Spark DataFrame
df = dynamic_frame.toDF()

# Use Spark DataFrame's truncate & overwrite functionality
# to update target JDBC table using Glue connection info:
df.write.option("truncate", "true").jdbc(
    url=jdbc_conf["fullUrl"],
    table="your_jdbc_table_name",
    mode="overwrite",
    properties={"user": jdbc_conf["user"], "password": jdbc_conf["password"]}
)

相关问题