如何使用pySpark在s3中加速数据写入过程

wlwcrazw  于 5个月前  发布在  Spark
关注(0)|答案(2)|浏览(71)

我必须使用PySpark在S3中编写包含700000行的parquet文件。我在parquet文件中写入 Dataframe ,并使用overwrite方法将其保存在S3中。
为了在一个桶中写入一个 Dataframe ,我的代码需要50秒才能完成,我必须在桶中写入大约6000个 Dataframe 。我想优化我的代码,以便它最多需要30秒。
这是我的代码片段:

original_dataframe=spark.read.option('header', 'true').option('inferSchema','true').parquet("s3://"+bucketname+"/"+data_files_path)

distinct_df=original_df.distinct()
distinct_df_cache=distinct_df.cache() 
 

distinct_df_cache.coalesce(8).write.format("parquet")
.option("header","true").option("inferSchema","true")
.mode("overwrite").save("s3://"+bucketname+"/"+data_files_path)

字符串
我使用这个spark submit命令来运行它。

sudo spark-submit --master yarn --executor-memory 48G --driver-memory 30g --num-executors 8 --executor-cores 8 --conf spark.sql.shuffle.partitions=16 --conf spark.sql.parquet.mergeSchema=true --conf spark.sql.parquet.filterPushdown=true --conf spark.default.parallelism=90 --conf spark.sql.parquet.writeLegacyFormat=true my_Code.py


从阅读到在同一个桶路径中写入,整个过程平均需要80秒左右。
我可以做些什么来优化它,使它应该需要大约30秒来完成一个循环的整个过程。我必须一次性循环运行此代码10 000次。目前,它将需要80 X 10 000 = 800 000秒。它将运行一整天。
如果可能的话,我希望结果是40 X 10000 = 400 000秒来完成整个过程。

bqucvtff

bqucvtff1#

通常增加写入并行性可以提高速度。然后将合并和 Shuffle 并行性都增加到64(您的spark提交配置中有8 * 8个可用写入器)。
一个缺点是,你也会增加生产的s3文件的数量,但这不应该是一个大问题。

wnvonmuf

wnvonmuf2#

  1. option('inferSchema'. true)实际上是option(“通读所有源数据以确定每列的类型”)。建议:取消设置
    1.确保你使用的是s3优化的committer,避免重命名。参考你的Spark发行版的文档。
  2. python对于笔记本来说可能是一种很好的语言,但是在生产中它的效率可能会比较低。请考虑以下两种情况之一:
  • 尝试在SQL语句中执行
  • 用scala或者java写,把它放到classpath中,然后直接调用。

如果你真的想最大限度地提高性能,从spark-standalone开始,在担心分发之前快速获得,因为它更容易进行基准测试和分析。

相关问题