spark联合大型csv文件并写入Parquet文件

eufgjt7s  于 2021-07-14  发布在  Spark
关注(0)|答案(1)|浏览(260)

我有一个大的csv文件与相同的模式一串。我想合并这些文件,并将结果写入一个按列划分的Parquet文件 file_name .
以下是我迄今为止所做的:

children_files = hdfs.list("/landing/my_data/children_flow/")
df = spark.createDataFrame(
            spark.sparkContext.emptyRDD(),
            dfSchema
)
df = df.withColumn("file_name", lit(None))
for one_file in children_files :
    df2 = spark.read.csv(os.path.join("/landing/my_data/children_flow/", one_file))
    df2 = df2.withColumn("file_name", lit(one_file.replace(".csv", "")))
    df = df.union(df2)
df.write.partitionBy("file_name").parquet("/staging/my_data/children_flow/")

问题是我越来越 java.lang.OutOfMemoryError: Java heap space 错误。
我试着做一个

df.repartition(3000, "file_name").write.partitionBy("file_name").parquet("/staging/my_data/children_flow/")

但也不管用。你能提出一个解决办法吗?

xmjla07d

xmjla07d1#

看起来hdfs位置中的文件太多了。因为有那么多文件 union 作为一个转变而不是一个行动的Spark,在试图建立物理计划时可能会耗尽记忆。
在任何情况下,如果您计划读取大量具有相同模式的csv文件,我将使用结构化流。为了避免手动编写模式,可以从示例文件中推断出来。
下面的代码显示了这个想法:


# define input and output paths and checkpoint location

csvPath = "hdfs:///landing/my_data/children_flow/"
parquetPath = "hdfs:///staging/my_data/children_flow/"
checkpointLoc = "hdfs:///path/to/dir/"

# infer Schema from one example csv file

schemaDf = spark.read.format("csv").option("inferSchema", true).load(csvPath + "first.csv")
schema = schemaDf.schema

# create Stream by reading from csv and writing to parquet

df = spark.readStream \
  .format("csv") \
  .schema(schema) \
  .load(csvPath) \
  .withColumn("file_name", split(input_file_name(), "/")(4))

query = df.writeStream \
  .format("parquet") \
  .outputMode("append") \
  .option("path", parquetPath) \
  .option("checkpointLocation", checkpointLoc) \
  .partitionBy("file_name") \
  .start() \
  .awaitTermination()

如果内存再次耗尽,可以将选项“maxfilespertrigger”设置为一个合适的数字。如果它创建了太多的Parquet文件,您最终可以只读取->重新分区->写入一次Parquet数据。

相关问题