使用delta lake中的spark更新现有行,而不影响另一个作业同时写入的数据

drkbr07n  于 2021-07-12  发布在  Spark
关注(0)|答案(0)|浏览(202)

目标:我想用一个定期运行的spark作业a更新delta lake表中的一个现有列,同时能够运行另一个定期运行的spark作业b来添加新数据,而不会丢失数据。
问题:据我所知,我需要使用savemode.overwrite来更新现有数据。但是,如果在添加新数据的同时运行另一个spark作业,这将导致数据丢失,因为覆盖作业所使用的增量表版本在实际覆盖发生时可能已过期。在我的测试中,这导致了新数据的数据丢失。
有没有一种方法可以在不使用第二个表来更新数据的情况下实现我的目标?我知道有可能使用第二张经过浓缩和消毒的手术台,但现在我宁愿在一张手术台上做手术。
spark更新现有数据的作业类似于:

// Code is in Kotlin not Scala, but that shouldn't matter
spark
        .read()
        .format("delta")
        .load("path-to-table")
        .withColumn("enriched", "enriched_data") // or any other operation altering the state
        .write()
        .format("delta")
        .mode("overwrite")
        .save("path-to-table")

spark作业是一种结构化的流式作业,它将新数据附加到表中。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题