scala 如何读取最新版本的Delta表的Parquet文件?

ukxgm1gy  于 8个月前  发布在  Scala
关注(0)|答案(1)|浏览(85)

我们有大约100 GB的大数据,存储在200个Parquet文件中。因此,为了保存时间,我们并行运行10个作业(每个作业阅读20个文件)。
但是,由于增量表在每个版本后维护历史记录,它增加了200多个Parquet文件。所以我们的阅读文件和推送数据的逻辑不起作用。有没有办法只读取最新版本的文件或只维护一个(最新)版本的文件?

directory_path = "abfss://path/<table>"

# Get the list of files within the directory
file_list = dbutils.fs.ls(directory_path)  
file_process = [ ]
for index, file_info in enumerate(file_list):
  if file_info.isFile():
        file_path = file_info.path
        file_name = file_info.path
        
        pattern = r'part-\d+'
        match = re.search(pattern, file_name)
        numeric_portion=0
        if match:
            file_name = match.group() 
            numeric_portion = file_name[5:]
            print(numeric_portion)
            
        if int(numeric_portion) >=0) and int(numeric_portion) <=20:
            // this way we read 20 files in each job
pgccezyw

pgccezyw1#

默认情况下使用spark,您将获得最新版本的数据。

spark.read.format("delta").load("/user/")

要获取文件路径,

spark.read.format("delta").load("/user/").select("*","_metadata.file_path")

如果你不想使用spark,你可以使用下面的代码来获取最新版本的文件。

import json
import pandas as pd
crcf = [i[0] for i in dbutils.fs.ls("user/_delta_log") if  ".crc"  in i[0]]
max_ver_path = "/"+max(crcf).replace(":","")
print(max_ver_path)
tmp = json.load(open(max_ver_path))
latest_paths = [ "user/"+i['path'] for i in tmp["allFiles"]]
latest_paths

使用pandas读取这些文件。

df = pd.concat([pd.read_parquet("/dbfs/"+f) for f in latest_paths], ignore_index=True)
df

你也可以把它转化为Spark。

spark.createDataFrame(df)

相关问题