pyspark Spark增量表更新

c6ubokkw  于 2022-11-01  发布在  Spark
关注(0)|答案(2)|浏览(776)

我在Microsoft Azure数据库环境中使用sparksql和pyspark工作。因此,我在湖上有一个增量表,其中的数据被分区,例如:file_date。每个分区包含每天存储数百万条记录的文件,这些记录没有主键/唯一键。所有这些记录都有一个“status”列,该列可以包含值NULL(如果该特定记录上的所有内容都正常)或Not null(比如说,如果没有找到特定列的特定查找Map)。另外,我的过程包含另一个称为“Map”的文件夹,它会定期刷新,为了简单起见,我们说每夜,从哪里找到Map。
每天很有可能会有大约100~200行出错(状态列包含非空值)。从这些文件中,每天(因此是按file_date分区),下游作业取出所有有效记录并将其发送以进行进一步处理,忽略那100-200个出错记录,正在等待接收正确的Map文件。除了有效的状态记录外,下游作业还应尝试查看是否为出错的记录找到Map,如果存在,还应进一步将其删除(当然,在用适当的Map和状态更新数据湖之后)。
最好的方法是先用正确的Map直接更新delta表/lake,然后更新status列,写上“available_for_reprocessing”和我的下游作业,拉取当天的有效数据+拉取“available_for_reprocessing”数据,处理后,用“processed”状态更新回来。但使用delta这似乎超级难。
我正在查看“https://docs.databricks.com/delta/delta-update.html“,其中的更新示例只是针对要更新的常量的简单更新,而不是针对来自多个表的更新。
另一个但效率最低的方法是,提取所有数据(已处理的和出错的),获取错误记录的Map,并使用replaceWhere选项将 Dataframe 写回增量湖。(数百万条记录)并写回所有内容只是为了处理比如说最多1000条记录。如果你在“https://docs.databricks.com/delta/delta-update.html“搜索deltaTable = DeltaTable.forPath(spark, "/data/events/"),提供的例子是非常简单的更新。2没有唯一键,也不可能更新特定的记录。3有人能帮忙吗?
我使用了pyspark,或者可以使用sparksql,但我迷路了

anauzrmj

anauzrmj1#

如果要更新1列('status'),条件是对于以前不正确的行,所有查找现在都正确(其中'status'目前是不正确的),我认为UPDATE命令沿着EXISTS可以帮助您解决这个问题。它没有在更新文档中提到,但它对删除和更新操作都有效,从而有效地允许您更新/删除连接记录。
对于您的场景,我相信sql命令看起来应该是这样的:

UPDATE your_db.table_name AS a 
SET staus = 'correct'
  WHERE EXISTS 
  (
    SELECT * 
    FROM your_db.table_name AS b 
    JOIN lookup_table_1 AS t1 ON t1.lookup_column_a = b.lookup_column_a
    JOIN lookup_table_2 AS t2 ON t2.lookup_column_b = b.lookup_column_b
    -- ... add further lookups if needed
    WHERE
    b.staus = 'incorrect' AND
    a.lookup_column_a = b.lookup_column_a AND 
    a.lookup_column_b = b.lookup_column_b
  )
wgx48brx

wgx48brx2#

合并成功了。
合并到部门增量作为主部门使用更新的部门位置作为更新部门ON更新部门.dno =主部门.dno当匹配时,则更新设置主部门.dname =更新部门.更新的名称,主部门.位置=更新部门.更新的位置

相关问题