我正在有条件地加载文件,想知道执行以下操作的最佳方法是什么:
如果文件存在,请将其读入。
迭代地这样做,使得每个现有文件都被添加到sparkDataframe中。
结果将是一个大的sparkDataframe,其中包含所提供路径列表中的所有现有文件。
这是我所拥有的,但它不能正常工作。
for path in list_of_paths:
if s3.exists(path):
sdf = spark.read.format("com.databricks.spark.avro").load(path)
final_sdf = sdf.union(sdf)
另外一个挑战是,这些文件变化非常快,因此,如果我对文件存在性执行初始检查,在加载它们时,它们可能在技术上不再存在。
因此,操作的顺序需要是检查文件是否存在,如果文件有效,则立即加载。
1条答案
按热度按时间hc8w905p1#
您可以将Dataframe收集到一个列表中,并使用
reduce
以及unionAll
要将它们组合到单个Dataframe中,请执行以下操作:或者您可以过滤列表并将整个列表提供给spark reader,这样效率会更高: