spark:无法将此数组转换为不安全的格式,因为它太大

kt06eoxx  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(358)

我有这样一个矩阵:

0.0  0.4  0.4  0.0 
0.1  0.0  0.0  0.7 
0.0  0.2  0.0  0.3 
0.3  0.0  0.0  0.0

我想把它写进hdfs,而不是模仿spark的logisticregression中的源代码,那么我的代码就是blow:

private case class Data(unigram: Array[Double],
                          interceptVector: Matrix)
val data = Data(unigram.value, denseVector)
val df = sparkSession.createDataFrame(Seq(data))
df.repartition(1).write.mode("overwrite").parquet(bigramPath)

如果矩阵小,一切正常,但当矩阵大时,Spark抛出-吹气误差:

Exception in thread "main" java.lang.UnsupportedOperationException: Cannot convert this array to unsafe format as it's too big.
    at org.apache.spark.sql.catalyst.expressions.UnsafeArrayData.fromPrimitiveArray(UnsafeArrayData.java:447)
    at org.apache.spark.sql.catalyst.expressions.UnsafeArrayData.fromPrimitiveArray(UnsafeArrayData.java:487)
    at org.apache.spark.ml.linalg.MatrixUDT.serialize(MatrixUDT.scala:66)
    at org.apache.spark.ml.linalg.MatrixUDT.serialize(MatrixUDT.scala:28)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$UDTConverter.toCatalystImpl(CatalystTypeConverters.scala:143)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:103)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:251)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:231)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:103)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$2.apply(CatalystTypeConverters.scala:379)
    at org.apache.spark.sql.catalyst.plans.logical.LocalRelation$$anonfun$fromProduct$1.apply(LocalRelation.scala:42)
    at org.apache.spark.sql.catalyst.plans.logical.LocalRelation$$anonfun$fromProduct$1.apply(LocalRelation.scala:42)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.immutable.List.map(List.scala:285)
    at org.apache.spark.sql.catalyst.plans.logical.LocalRelation$.fromProduct(LocalRelation.scala:42)
    at org.apache.spark.sql.SparkSession.createDataFrame(SparkSession.scala:315)
    at com.wps.NgramModel2.save(Ngram2.scala:119)
    at com.wps.NgramDemo2$.main(NgramDemo2.scala:24)
    at com.wps.NgramDemo2.main(NgramDemo2.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:904)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:198)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:228)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:137)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

我该怎么办?把矩阵一个一个分开写?还是有别的好办法?
我发现这种方法很管用。

val bigramArray = JavaArrayOps.dmDToArray2(bigram)
    val lines: Array[String] = bigramArray
          .map(line => line.mkString(" "))
    var outputStream: FSDataOutputStream = null
    var bufferedWriter: BufferedWriter = null

    val hadoopConf: Configuration = new Configuration
    outputStream = HDFSUtil.getFSDataOutputStream(bigramPath, "/part-00000", hadoopConf)
    bufferedWriter = new BufferedWriter((new OutputStreamWriter(outputStream)))

    for (a <- 0 until lines.length) {
      bufferedWriter.write(lines(a) + "\n")
    }
zf2sa74q

zf2sa74q1#

iiuc,你想 save 矩阵和 load 作为你生活的一部分 Estimator .

下面是保存虚拟矩阵并加载它的代码-

case class Data(matrix: Matrix)
import org.apache.hadoop.fs.Path
    import org.apache.spark.ml.linalg.{Matrices, Matrix}

    def save(matrix: Matrix, path: String): Unit = {
      val data = Data(matrix)
      val df = spark.createDataFrame(Seq(data))
      val dataPath = new Path(path, "data").toString
      df.repartition(1).write.mode("overwrite").parquet(dataPath)
    }

    def load(path: String): Matrix = {
      val dataPath = new Path(path, "data").toString
      val df = spark.read.parquet(dataPath)
      val Row(matrix: Matrix) = df.select("matrix").head()
      matrix
    }

测试它使用 3 * 3 恒等矩阵-

println("### input matrix ###")
    val matrixToSave = Matrices.eye(3)
    println(matrixToSave)
    save(matrixToSave, "/path/models/matrix")
    val matrixLoaded = load("/path/models/matrix")
    println("### Loaded matrix ###")
    println(matrixLoaded)

输出-


### input matrix ###

1.0  0.0  0.0  
0.0  1.0  0.0  
0.0  0.0  1.0  

### Loaded matrix ###

1.0  0.0  0.0  
0.0  1.0  0.0  
0.0  0.0  1.0

希望有帮助!

相关问题