如何准确地创建一个高效和可重用的Databricks工作流,将原始SQL数据库转储到Delta Lake中。这里的一些困惑是为了实现以下最佳方法:
- 处理模式中的漂移(DB表中的列)=>对存储的表进行简单的覆盖可以吗?
- 捕获数据变化(CDC)并有效地合并现有数据;比如ID。这是否也与关系数据库相关?
- Delta Live Table(DLT)格式是否适合此情况?
可以想象以下过程:
1.遍历公共表information_schema:
table_names = spark.read.jdbc(url=jdbcUrl, table="information_schema.tables",
properties=connectionProperties) \
.filter("table_schema = 'public'") \
.select("table_name") \
.rdd.flatMap(lambda x: x) \
.collect()
for table in table_names:
...
字符串
1.对于每一张table:
- (A)如果Delta Lake表不存在(或者可能在模式方面过时),则创建新的Delta Lake表,否则;
- (B)将新数据/更新数据合并到Delta Lake中。
像Airbyte这样的第三方供应商和其他供应商提供了这项服务,并不是因为它真的很难实现,而更有可能是因为在Databases DLT/Delta湖畔上缺乏这种通用过程的文档或参考实现。
- 一个令人满意的答案是:(I)对OP中包含的(错误的?)假设的一些背景/验证,(II)此工作流程的缺失代码,以及(III)对提出的3点的回答/澄清。
1条答案
按热度按时间jxct1oxe1#
如果你的源数据库正在并发处理事务,那么(1)和(2)中的方法将不会产生表的一致副本,因为每个表的副本将不会来自相同的提交点。
您可以使用类似
pg_dump
的东西来获得数据库的一致转储,并将该文本转换为可以加载到spark DataFrames并保存的内容。这将在Delta Lake中给予一致的快照。再加上在pubsub(如Kafka)中捕获Postgres更改日志,甚至可以让您以流的方式保持它的最新状态,但实际上这样做是相当复杂的。