我们有大约100 GB的大数据,存储在200个Parquet文件中。因此,为了保存时间,我们并行运行10个作业(每个作业阅读20个文件)。
但是,由于增量表在每个版本后维护历史记录,它增加了200多个Parquet文件。所以我们的阅读文件和推送数据的逻辑不起作用。有没有办法只读取最新版本的文件或只维护一个(最新)版本的文件?
directory_path = "abfss://path/<table>"
# Get the list of files within the directory
file_list = dbutils.fs.ls(directory_path)
file_process = [ ]
for index, file_info in enumerate(file_list):
if file_info.isFile():
file_path = file_info.path
file_name = file_info.path
pattern = r'part-\d+'
match = re.search(pattern, file_name)
numeric_portion=0
if match:
file_name = match.group()
numeric_portion = file_name[5:]
print(numeric_portion)
if int(numeric_portion) >=0) and int(numeric_portion) <=20:
// this way we read 20 files in each job
1条答案
按热度按时间pgccezyw1#
默认情况下使用spark,您将获得最新版本的数据。
要获取文件路径,
如果你不想使用spark,你可以使用下面的代码来获取最新版本的文件。
使用pandas读取这些文件。
你也可以把它转化为Spark。