spark:保存到hdfs时出现内存不足错误

55ooxyrt  于 2021-05-30  发布在  Hadoop
关注(0)|答案(2)|浏览(608)

当我将大数据保存到hdfs时,我遇到了oome

val accumulableCollection = sc.accumulableCollection(ArrayBuffer[String]())
val rdd = textfile.filter(row => {
    if (row.endsWith(",")) {
        accumulableCollection += row
        false
    } else if (row.length < 100) {
        accumulableCollection += row
        false
    }
    valid
})
rdd.cache()
val rdd2 = rdd.map(_.split(","))
val rdd3 = rdd2.filter(row => {
    var valid = true
    for((k,v) <- fieldsMap if valid ) {
        if (StringUtils.isBlank(row(k)) || "NULL".equalsIgnoreCase(row(k))) {
            accumulableCollection += row.mkString(",")
            valid = false
        }
    }
    valid
})
sc.parallelize(accumulableCollection.value).saveAsTextFile(hdfsPath)

我在spark submit中使用这个:

--num-executors 2 --driver-memory 1G --executor-memory 1G --executor-cores 2

以下是日志的输出:

15/04/12 18:46:49 WARN scheduler.TaskSetManager: Stage 4 contains a task of very large size (37528 KB). The maximum recommended task size is 100 KB.
15/04/12 18:46:49 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 4.0 (TID 8, worker4, PROCESS_LOCAL, 38429279 bytes)
15/04/12 18:46:49 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 4.0 (TID 9, worker3, PROCESS_LOCAL, 38456846 bytes)
15/04/12 18:46:50 INFO scheduler.TaskSetManager: Starting task 2.0 in stage 4.0 (TID 10, worker4, PROCESS_LOCAL, 38426488 bytes)
15/04/12 18:46:51 INFO scheduler.TaskSetManager: Starting task 3.0 in stage 4.0 (TID 11, worker3, PROCESS_LOCAL, 38445061 bytes)
15/04/12 18:46:51 INFO cluster.YarnClusterScheduler: Cancelling stage 4
15/04/12 18:46:51 INFO cluster.YarnClusterScheduler: Stage 4 was cancelled
15/04/12 18:46:51 INFO scheduler.DAGScheduler: Job 4 failed: saveAsTextFile at WriteToHdfs.scala:87, took 5.713617 s
15/04/12 18:46:51 INFO yarn.ApplicationMaster: Final app status: FAILED, exitCode: 15, (reason: User class threw exception: Job aborted due to stage failure: Serialized task 8:0 was 38617206 bytes, which exceeds max allowed: spark.akka.frameSize (10485760 bytes) - reserved (204800 bytes). Consider increasing spark.akka.frameSize or using broadcast variables for large values.)
Exception in thread "Driver" org.apache.spark.SparkException: Job aborted due to stage failure:**Serialized task 8:0 was 30617206 bytes, which exceeds max allowed: spark.akka.frameSize (10485760 bytes)**- reserved (204800 bytes). Consider increasing spark.akka.frameSize or using broadcast variables for large values.

序列化任务8:0为30617206字节,超出了允许的最大值:spark.akka.framesize(10485760字节)--(1)30mb序列化任务是什么?
考虑对大值使用广播变量。--(2) 广播变量应该是什么?rdd2?或者accumulablecollection,因为这是我写给hdfs的?
当我增加framesize时,错误是:java.lang.outofmemoryerror:java heap space,所以我必须将驱动程序内存和执行程序内存增加到2g才能工作。如果accumulablecollection.value.length是500000,我需要使用3g。这正常吗?
该文件只有146mb,包含200000行(2g内存)(在hdfs中,它分为2个分区,每个分区包含73mb的内存)

kqhtkvqz

kqhtkvqz1#

它的意思和它说的差不多。您正在尝试序列化一个非常大的对象。你应该重写你的代码来避免这样做。
例如,我不清楚为什么您要尝试更新一个可累积的集合,并在 filter ,甚至可以执行多次。然后缓存rdd,但您已经尝试在驱动程序上为其创建副本了吗?然后将其他值添加到本地集合,然后再将其转换为rdd?
为什么要收集?只需操作RDD。这里有很多冗余。

8dtrkrch

8dtrkrch2#

spark中的中央编程抽象是rdd,您可以通过两种方式创建它们:
(1) 并行化驱动程序中的现有集合,或(2)引用外部存储系统中的数据集,例如共享文件系统、hdfs、hbase或任何提供hadoop inputformat的数据源。
这个 parallelize() 方法(1)要求将整个数据集存储在一台机器上(第26页)。
方法(2)称为外部数据集,应该用于大型文件。
下一行使用 accumulableCollection.value 并且需要安装在一台机器上:

sc.parallelize(accumulableCollection.value)

缓存rdd时也可能会超出内存:

rdd.cache()

这意味着整个 textfile rdd存储在内存中。你很可能不想这样做。有关为数据选择缓存级别的建议,请参阅spark文档。

相关问题