我必须使用PySpark在S3中编写包含700000行的parquet文件。我在parquet文件中写入 Dataframe ,并使用overwrite方法将其保存在S3中。
为了在一个桶中写入一个 Dataframe ,我的代码需要50秒才能完成,我必须在桶中写入大约6000个 Dataframe 。我想优化我的代码,以便它最多需要30秒。
这是我的代码片段:
original_dataframe=spark.read.option('header', 'true').option('inferSchema','true').parquet("s3://"+bucketname+"/"+data_files_path)
distinct_df=original_df.distinct()
distinct_df_cache=distinct_df.cache()
distinct_df_cache.coalesce(8).write.format("parquet")
.option("header","true").option("inferSchema","true")
.mode("overwrite").save("s3://"+bucketname+"/"+data_files_path)
字符串
我使用这个spark submit命令来运行它。
sudo spark-submit --master yarn --executor-memory 48G --driver-memory 30g --num-executors 8 --executor-cores 8 --conf spark.sql.shuffle.partitions=16 --conf spark.sql.parquet.mergeSchema=true --conf spark.sql.parquet.filterPushdown=true --conf spark.default.parallelism=90 --conf spark.sql.parquet.writeLegacyFormat=true my_Code.py
型
从阅读到在同一个桶路径中写入,整个过程平均需要80秒左右。
我可以做些什么来优化它,使它应该需要大约30秒来完成一个循环的整个过程。我必须一次性循环运行此代码10 000次。目前,它将需要80 X 10 000 = 800 000秒。它将运行一整天。
如果可能的话,我希望结果是40 X 10000 = 400 000秒来完成整个过程。
2条答案
按热度按时间bqucvtff1#
通常增加写入并行性可以提高速度。然后将合并和 Shuffle 并行性都增加到64(您的spark提交配置中有8 * 8个可用写入器)。
一个缺点是,你也会增加生产的s3文件的数量,但这不应该是一个大问题。
wnvonmuf2#
1.确保你使用的是s3优化的committer,避免重命名。参考你的Spark发行版的文档。
如果你真的想最大限度地提高性能,从spark-standalone开始,在担心分发之前快速获得,因为它更容易进行基准测试和分析。