我有5个文件,每个文件包含大小为
File1=~500KB
File2=~1MB
File3=~1GB
File4=~6GB
File5=~1GB
我使用wholetextfile读取所有5个文件。每个文件有不同的列数。
* val data = sc.wholeTextFiles("..........Path......./*")
在进一步的分析中,我发现我的代码在下面一行之后无法工作..在这种情况下,有没有关于如何使用mappartition的建议
val files = data.map { case (filename, content) => filename}
files.collect.foreach( filename => {
..../Performing some operations/...
})*
因此,当我尝试在服务器上提交这段代码时,它会给出错误java.lang.outofmemoryerror,当我从源路径中删除6gb文件时,它可以正常工作。所以只有大文件才有问题。我正在使用下面的spark提交代码。。
* spark-submit --class myClassName \
--master yarn-client --conf spark.executor.extraJavaOptions="-
Dlog4j.configuration=log4j.properties" \
--conf spark.driver.extraJavaOptions="-Dlog4j.configuration=...FilePath.../log4j.properties" \
--files ...FilePath.../log4j.properties --num-executors 4 --executor-cores 4 \
--executor-memory 10g --driver-memory 5g --conf "spark.yarn.executor.memoryOverhead=409" \
--conf "spark.yarn.driver.memoryOverhead=409" .................JarFilePath.jar*
Sparkversion:1.6.0 scala 版本:2.10.5
1条答案
按热度按时间vatpfxk51#
我假设您使用wholetextfile而不是textfile,因为“每个文件有不同的列数”(注意:在这种情况下,textfile的内存需求较小,因此您可以在不增加executor内存(executor memory)的情况下使此代码正常工作。基本上,模式在文件之间没有对齐。如果最终结果与模式无关(即具有相同的列数),则可以通过在每个文件上启动spark作业来实现预处理层,该作业使用textfile输出具有相同内容、列数的所需内容。
否则,您可以过滤掉大文件,并在这些文件上启动单独的spark作业,将它们分割成更小的文件。那样你就会记忆犹新。