pyspark Spark-磁盘溢出和内存溢出问题

w3nuxt5m  于 4个月前  发布在  Spark
关注(0)|答案(1)|浏览(129)

我是一个新的Spark和工作的逻辑连接13个文件,并写入到一个blob存储的最终文件。输入文件是CSV格式和输出被写入为Parquet。出13个文件,file 1是950 mb,file 2是50 mb,file 3是150 mb,file 4是620 mb,file 5是235 mb,file6&7是小于1 mb,file 8是70 mb,其余都是kbs.没有其他计算正在执行,但我看到2GB的溢出内存.如何避免这种内存溢出?
我做了一些调查,了解了AQE,它在最新的spark版本中默认启用,AQE负责分区和合并,所以在 Shuffle 后没有200个分区。在我的情况下,最终写入的文件有13个parquet分区,每个大约60 mb。
其次,我重点介绍了启用AQE的连接类型。我看到AQE对除file 1、file 4和file 5之外的所有连接都进行了广播。这三个连接是用Sortmergejoin执行的,其余的是用BroadcastHashjoin执行的。所以AQE在选择正确的连接方面发挥了作用。仍然存在内存溢出。
接下来,我开始阅读如何减少内存溢出。我知道当分区大小大于spark memory可以处理的大小时,会发生内存溢出,导致数据溢出到磁盘。这种溢出发生在join中的shuffle阶段,一个建议是通过bucket数据来避免shuffle。所以我开始bucket file 1,file 4和file 5的连接键(这是相同的3个文件),并将它们分别分配到10个bucket中。DAG结果显示,这次没有shuffle,但仍然存在内存溢出,大小相同。
我知道我的文件不是很大,但这是一个月的数据,我的下游应用程序将处理更大的文件,因为我添加了更多的月度数据。
我使用的是standard_D3_v2,14 GB,4核,工作进程范围为2-8。这个对13个文件的连接操作使用了6个工作进程,其中只有2个工作进程被使用,直到连接阶段结束,并且为了写入输出文件,它使用了4个工作进程(根据事件时间轴观察到这一点)。
有没有人有任何建议,如何避免这种溢出的记忆?

ijnw1ujt

ijnw1ujt1#

为了避免溢出,您可以选择(或两者):
1.增加分区数(增加默认的并行度)。你将得到更小的Parquet文件作为附带效果。
1.增加执行器内存(和/或减少每个执行器的核心数量)

相关问题