spark rdd.todf()是线程安全的吗?

z8dt9xmd  于 2021-07-14  发布在  Spark
关注(0)|答案(0)|浏览(268)

我有一个在spark-2.4.4集群中运行的应用程序,它使用 rdd.toDF() 然后输出到文件。
为了优化从机资源的使用,应用程序在多线程中执行作业。代码段如下所示:

import spark.implicits._

    Seq(rdd1, rdd2).par.foreach { rdd =>
      rdd.toDF().write.text("xxx")
    }

我发现了 toDF() 不是线程安全的。应用程序有时失败 java.lang.UnsupportedOperationException .
您可以从以下代码片段中复制它(发生率为1%,当case类有大量字段时更容易发生):

package example

    import org.apache.spark.sql.SparkSession

    object SparkToDF {

      case class SampleData(
                             field_1: Option[Int] = None,
                             field_2: Option[Long] = Some(0L),
                             field_3: Option[Double] = None,
                             field_4: Option[Float] = None,
                             field_5: Option[Short] = None,
                             field_6: Option[Byte] = None,
                             field_7: Option[Boolean] = Some(false),
                             field_8: Option[StructData] = Some(StructData(Some(0), Some(0L), None, None, None, None, None)),
                             field_9: String = "",
                             field_10: String = "",
                             field_11: String = "",
                             field_12: String = "",
                             field_13: String = "",
                             field_14: String = "",
                             field_15: String = "",
                             field_16: String = "",
                             field_17: String = "",
                             field_18: String = "",
                             field_19: String = "",
                             field_20: String = "",
                             field_21: String = "",
                             field_22: String = "",
                             field_23: String = "",
                             field_24: String = "",
                             field_25: String = "",
                             field_26: String = "",
                             field_27: String = "",
                             field_28: String = "",
                             field_29: String = "",
                             field_30: String = "",
                             field_31: String = "",
                             field_32: String = "",
                             field_33: String = "",
                             field_34: String = "",
                             field_35: String = "",
                             field_36: String = "",
                             field_37: String = "",
                             field_38: String = "",
                             field_39: String = "",
                             field_40: String = "",
                             field_41: String = "",
                             field_42: String = "",
                             field_43: String = "",
                             field_44: String = "",
                             field_45: String = "",
                             field_46: String = "",
                             field_47: String = "",
                             field_48: String = "",
                             field_49: String = "",
                             field_50: String = "",
                             field_51: String = "",
                             field_52: String = "",
                             field_53: String = "",
                             field_54: String = "",
                             field_55: String = "",
                             field_56: String = "",
                             field_57: String = "",
                             field_58: String = "",
                             field_59: String = "",
                             field_60: String = ""
                           )

      case class StructData(
                             field_1: Option[Int],
                             field_2: Option[Long],
                             field_3: Option[Double],
                             field_4: Option[Float],
                             field_5: Option[Short],
                             field_6: Option[Byte],
                             field_7: Option[Boolean]
                           )

      def main(args: Array[String]): Unit = {
        val spark = SparkSession.builder()
          .appName("ThreadSafeToDF()")
          .getOrCreate()

        import spark.implicits._

        try {
          (1 to 10).par.foreach { _ =>
            (1 to 10000).map(_ => SampleData()).toDF()
          }
        } finally {
          spark.close()
        }
      }
    }
for i in {1..200}; do
      echo "iter: $i (`date`)"; 
      ./bin/spark-submit --class sample.SparkToDF your.jar
    done

您可能会收到类似的异常消息: Schema for type A is not supported ```
Exception in thread "main" java.lang.UnsupportedOperationException: Schema for type scala.Option[scala.Float] is not supported
at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:812)
at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:743)
at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:56)
at org.apache.spark.sql.catalyst.ScalaReflection$class.cleanUpReflectionObjects(ScalaReflection.scala:929)
at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:49)
at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:742)
at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$deserializerFor$1$$anonfun$7.apply(ScalaReflection.scala:407)
at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$deserializerFor$1$$anonfun$7.apply(ScalaReflection.scala:406)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.List.foreach(List.scala:392)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.immutable.List.map(List.scala:296)
at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$deserializerFor$1.apply(ScalaReflection.scala:406)
at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$deserializerFor$1.apply(ScalaReflection.scala:176)
at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:56)
at org.apache.spark.sql.catalyst.ScalaReflection$class.cleanUpReflectionObjects(ScalaReflection.scala:929)
at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:49)
at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$deserializerFor(ScalaReflection.scala:176)
at org.apache.spark.sql.catalyst.ScalaReflection$.deserializerFor(ScalaReflection.scala:164)
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:72)
at org.apache.spark.sql.Encoders$.product(Encoders.scala:275)
at org.apache.spark.sql.LowPrioritySQLImplicits$class.newProductEncoder(SQLImplicits.scala:248)
at org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImplicits.scala:34)

从错误消息中,异常是由 `ScalaReflection.schemaFor()` .
我已经看过代码了,似乎spark使用scala反射来获取数据类型,我知道scala反射中存在并发问题。
https://issues.apache.org/jira/browse/spark-26555
类型匹配scala反射中的线程安全
这是预期的行为吗?我找不到任何关于创建Dataframe时线程安全的文档。
我在将rdd转换为dataframe时添加了一个锁,解决了这个问题。

object toDFLock

import spark.implicits

Seq(rdd1, rdd2).par.foreach { rdd =>
  toDFLock.synchronized(rdd.toDF()).write.text("xxx")
}

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题