如何根据文件是否存在加载文件,然后迭代地将其添加到sparkDataframe中,以便跳过不存在的文件?

rqdpfwrv  于 2021-07-14  发布在  Spark
关注(0)|答案(1)|浏览(263)

我正在有条件地加载文件,想知道执行以下操作的最佳方法是什么:
如果文件存在,请将其读入。
迭代地这样做,使得每个现有文件都被添加到sparkDataframe中。
结果将是一个大的sparkDataframe,其中包含所提供路径列表中的所有现有文件。
这是我所拥有的,但它不能正常工作。

for path in list_of_paths:
    if s3.exists(path):
        sdf = spark.read.format("com.databricks.spark.avro").load(path)
final_sdf = sdf.union(sdf)

另外一个挑战是,这些文件变化非常快,因此,如果我对文件存在性执行初始检查,在加载它们时,它们可能在技术上不再存在。
因此,操作的顺序需要是检查文件是否存在,如果文件有效,则立即加载。

hc8w905p

hc8w905p1#

您可以将Dataframe收集到一个列表中,并使用 reduce 以及 unionAll 要将它们组合到单个Dataframe中,请执行以下操作:

from functools import reduce

sdf_list = [spark.read.format("com.databricks.spark.avro").load(path) for path in list_of_paths if s3.exists(path)]
final_sdf = reduce(lambda a, b: a.unionAll(b), sdf_list)

或者您可以过滤列表并将整个列表提供给spark reader,这样效率会更高:

paths = [path for path in list_of_paths if s3.exists(path)]
final_sdf = spark.read.format("com.databricks.spark.avro").load(paths)

相关问题