pyspark 如何从Postgres RDB到Databricks Lakehouse Delta Lake?

cunj1qz1  于 5个月前  发布在  Spark
关注(0)|答案(1)|浏览(66)

如何准确地创建一个高效和可重用的Databricks工作流,将原始SQL数据库转储到Delta Lake中。这里的一些困惑是为了实现以下最佳方法:

  • 处理模式中的漂移(DB表中的列)=>对存储的表进行简单的覆盖可以吗?
  • 捕获数据变化(CDC)并有效地合并现有数据;比如ID。这是否也与关系数据库相关?
  • Delta Live Table(DLT)格式是否适合此情况?
    可以想象以下过程:

1.遍历公共表information_schema:

table_names = spark.read.jdbc(url=jdbcUrl, table="information_schema.tables",
                               properties=connectionProperties) \
                               .filter("table_schema = 'public'") \
                               .select("table_name") \
                               .rdd.flatMap(lambda x: x) \
                               .collect()

for table in table_names:
    ...

字符串
1.对于每一张table:

  • (A)如果Delta Lake表不存在(或者可能在模式方面过时),则创建新的Delta Lake表,否则;
  • (B)将新数据/更新数据合并到Delta Lake中。

像Airbyte这样的第三方供应商和其他供应商提供了这项服务,并不是因为它真的很难实现,而更有可能是因为在Databases DLT/Delta湖畔上缺乏这种通用过程的文档或参考实现。

  • 一个令人满意的答案是:(I)对OP中包含的(错误的?)假设的一些背景/验证,(II)此工作流程的缺失代码,以及(III)对提出的3点的回答/澄清。
jxct1oxe

jxct1oxe1#

如果你的源数据库正在并发处理事务,那么(1)和(2)中的方法将不会产生表的一致副本,因为每个表的副本将不会来自相同的提交点。
您可以使用类似pg_dump的东西来获得数据库的一致转储,并将该文本转换为可以加载到spark DataFrames并保存的内容。这将在Delta Lake中给予一致的快照。
再加上在pubsub(如Kafka)中捕获Postgres更改日志,甚至可以让您以流的方式保持它的最新状态,但实际上这样做是相当复杂的。

相关问题