sparkjavapca:java堆空间和shuffle丢失的输出位置

bmvo0sr5  于 2021-05-19  发布在  Spark
关注(0)|答案(1)|浏览(390)

我尝试在4.827行和40.107列的Dataframe上执行pca,但是我遇到了java堆空间错误,并丢失了shuffle的输出位置(根据executors上的sdterr文件)。错误发生在“行矩阵的TreeAgregate”期间。scala:122“pca阶段。
集群
它是一个独立的集群,有16个工作节点,每个节点有1个执行器,4个内核和21.504mb内存。主节点有15g内存,我用“java-jar-xmx15g myapp.jar”给出。另外,“spark.sql.shuffle.partitions”是192,“spark.driver.maxresultsize”是6g。
简化代码

df1.persist (From the Storage Tab in spark UI it says it is 3Gb)
df2=df1.groupby(col1).pivot(col2).mean(col3) (This is a df with 4.827 columns and 40.107 rows)
df2.collectFirstColumnAsList
df3=df1.groupby(col2).pivot(col1).mean(col3) (This is a df with 40.107 columns and 4.827 rows)

-----it hangs here for around 1.5 hours creating metadata for upcoming dataframe-----

df4 = (..Imputer or na.fill on df3..)
df5 = (..VectorAssembler on df4..)
(..PCA on df5 with error Missing output location for shuffle..)
df1.unpersist

我见过并尝试过许多解决办法,但没有任何结果。其中:
将df5或df4重新分区为16、64、192、256、1000、4000(尽管数据看起来不倾斜)
将spark.sql.shuffle.partitions更改为16、64、192、256、1000、4000
每个执行器使用1和2个内核,以便为每个任务提供更多内存。
有两个执行者,两个核心或四个核心。
将“spark.memory.fraction”更改为0.8,将“spark.memory.storagefraction”更改为0.4。
总是同样的错误!怎么可能把这些记忆都吹走??有没有可能df实际上不适合内存?请让我知道如果你需要任何其他信息或打印屏幕。
编辑1
我将集群更改为2个spark worker,每个spark.sql.shuffle.partitions=48有1个执行器。每个执行器有115g和8个核心。下面是我加载文件(2.2gb)的代码,将每一行转换为一个密集向量,并将其提供给pca。
文件中的每一行都有这种格式(4.568行,每行有40.107个双倍值):

"[x1,x2,x3,...]"

代码是:

Dataset<Row> df1 = sp.read().format("com.databricks.spark.csv").option("header", "true").load("/home/ubuntu/yolo.csv");
StructType schema2 = new StructType(new StructField[] {
                        new StructField("intensity",new VectorUDT(),false,Metadata.empty())
            });
Dataset<Row> df = df1.map((Row originalrow) -> {
                    String yoho =originalrow.get(0).toString();
                    int sizeyoho=yoho.length();
                    String yohi = yoho.substring(1, sizeyoho-1);
                    String[] yi = yohi.split(",");
                    int s = yi.length;
                    double[] tmplist= new double[s];
                    for(int i=0;i<s;i++){
                        tmplist[i]=Double.parseDouble(yi[i]);
                    }

                    Row newrow = RowFactory.create(Vectors.dense(tmplist));
                    return newrow;
            }, RowEncoder.apply(schema2));
PCAModel pcaexp = new PCA()
                    .setInputCol("intensity")
                    .setOutputCol("pcaFeatures")
                    .setK(2)
                    .fit(df);

我在两个工人中的一个的标准上得到的确切错误是:

ERROR Executor: Exception in task 1.0 in stage 6.0 (TID 43)
java.lang.OutOfMemoryError
at java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
at org.apache.spark.util.ByteBufferOutputStream.write(ByteBufferOutputStream.scala:41)
at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:456)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

这是sparkui的stages标签:

这就是失败的阶段(rowmatrix的treeaggreegate)。scala:122):

编辑2


编辑3
我读取了整个文件,但每行只取10个值,然后创建密集向量。我还是会犯同样的错误!我有一个主内存235g和3个工人(1个执行器,每个有4个核心)和64g内存每个执行器。怎么会这样(别忘了文件的总大小只有2.3gb!)

Dataset<Row> df1 = sp.read().format("com.databricks.spark.csv").option("header", "true").load("/home/ubuntu/yolo.csv");

StructType schema2 = new StructType(new StructField[] {
                        new StructField("intensity",new VectorUDT(),false,Metadata.empty())
            });
Dataset<Row> df = df1.map((Row originalrow) -> {
                    String yoho =originalrow.get(0).toString();
                    int sizeyoho=yoho.length();
                    String yohi = yoho.substring(1, sizeyoho-1);
                    String[] yi = yohi.split(",");//this string array has all 40.107 values
                    int s = yi.length;
                    double[] tmplist= new double[s];
                    for(int i=0;i<10;i++){//I narrow it down to take only the first 10 values of each row
                        tmplist[i]=Double.parseDouble(yi[i]);
                    }
                    Row newrow = RowFactory.create(Vectors.dense(tmplist));
                    return newrow;
            }, RowEncoder.apply(schema2));

PCAModel pcaexp = new PCA()
                    .setInputCol("intensity")
                    .setOutputCol("pcaFeatures")
                    .setK(2)
                    .fit(df);
ewm0tg9j

ewm0tg9j1#

当spark应用程序执行大的shuffle阶段时,就会出现“shuffle缺少输出位置”的情况,它试图在执行者之间重新分配大量数据,并且集群网络中存在一些问题。
斯帕克说你不知道´在某个阶段没有记忆。您正在进行需要不同阶段的转换,它们也会消耗内存。此外,您应该首先持久化Dataframe,并且应该检查存储级别,因为您有可能持久化在内存中。
您正在链接几个spark范围内的转换:例如,在执行第一个pivot阶段时,spark会创建一个阶段,并对列执行无序排列以分组,可能您的数据有偏差,并且有一些执行器比其他执行器消耗更多内存,可能其中一个执行器中会发生错误。
除了Dataframe变换外,pca估计器还将Dataframe转换成rdd,增加更多的内存来计算协方差矩阵,并且可以处理nxn元素的breeze矩阵的密集表示。例如,svd是用微风制成的。这给其中一个遗嘱执行人带来了很大的压力。
也许您可以将结果Dataframe保存在hdfs(或其他什么)中,然后在另一个spark应用程序中执行pca。
主要问题。在去奇异值分解之前,算法需要计算文法矩阵,并使用rdd中的树形网格。这将创建一个非常大的双矩阵,它将被发送到驱动程序,这是一个错误,因为您的驱动程序没有´我的记性不够。你需要大大增加司机的记忆。你有网络错误,如果一个执行者失去了连接,作业就会崩溃´不要试图重新执行。
就我个人而言,我会尝试直接在驱动程序中的breeze(或smile)中执行pca,我的意思是,收集rdd字段,因为数据集比协方差矩阵小得多,并使用浮点表示手动执行。
仅使用breeze(既不使用spark也不使用treeagregation)计算pca的代码:

import breeze.linalg._
import breeze.linalg.svd._

object PCACode {

  def mean(v: Vector[Double]): Double = v.valuesIterator.sum / v.size

  def zeroMean(m: DenseMatrix[Double]): DenseMatrix[Double] = {
    val copy = m.copy
    for (c <- 0 until m.cols) {
      val col = copy(::, c)
      val colMean = mean(col)
      col -= colMean
    }
    copy
  }

  def pca(data: DenseMatrix[Double], components: Int): DenseMatrix[Double] = {
    val d = zeroMean(data)
    val SVD(_, _, v) = svd(d.t)
    val model = v(0 until components, ::)
    val filter = model.t * model
    filter * d
  }

  def main(args: Array[String]) : Unit = {
    val df : DataFrame = ???

    /**Collect the data and do the processing. Convert string to double, etc**/
    val data: Array[mutable.WrappedArray[Double]] =
      df.rdd.map(row => (row.getAs[mutable.WrappedArray[Double]](0))).collect()

    /**Once you have the Array, create the matrix and do the PCA**/
    val matrix = DenseMatrix(data.toSeq:_*)
    val pcaRes = pca(matrix, 2)

    println("result pca \n" + pcaRes)
  }
}

这个代码会在驱动程序中做pca,检查内存。如果它崩溃了,可能是因为浮动精度。

相关问题