在数据库笔记本中运行以下代码时:
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
case class DataRow(field1: String)
val sparkSession = SparkSession.builder.getOrCreate()
import sparkSession.implicits._
val udf1 = udf((x: String) => {
val testData = Seq(DataRow("test1"), DataRow("test2")).toDF("test") // This is failing at runtime
3
})
val df1 = Seq(DataRow("test1"), DataRow("test2")).toDF("test").withColumn("udf", udf1($"test")) // This is working
display(df1)
字符串
我在运行时收到一个ScalaReflectionException:
ScalaReflectionException: class $linedb3da7b2933d4b63a62b3d2a21c675f2141.$read in JavaMirror with com.databricks.backend.daemon.driver.DriverLocal$DriverLocalClassLoader@717115ad of type class com.databricks.backend.daemon.driver.DriverLocal$DriverLocalClassLoader with classpath [] and parent being com.databricks.backend.daemon.driver.ClassLoaders$ReplWrappingClassLoader@1670897 of type class com.databricks.backend.daemon.driver.ClassLoaders$ReplWrappingClassLoader with classpath [<unknown>] and parent being com.databricks.backend.daemon.driver.ClassLoaders$LibraryClassLoader@1a42da0a of type class com.databricks.backend.daemon.driver.ClassLoaders$LibraryClassLoader with classpath [file:/local_disk0/tmp/repl/spark-4071811259476162981-8e526ae9-25fb-4545-8d3f-963a8661cd2b/] and parent being sun.misc.Launcher$AppClassLoader@43ee72e6 of type class sun.misc.Launcher$AppClassLoader with classpath [...] not found.
at scala.reflect.internal.Mirrors$RootsBase.staticClass(Mirrors.scala:141)
at scala.reflect.internal.Mirrors$RootsBase.staticClass(Mirrors.scala:29)
at $linedb3da7b2933d4b63a62b3d2a21c675f2196.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$$$c35395a9233a7197629c985e87dce75$$$$w$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$typecreator6$1.apply(command-2013905963200886:11)
at scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe$lzycompute(TypeTags.scala:237)
at scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe(TypeTags.scala:237)
at org.apache.spark.sql.catalyst.ScalaReflection$.encoderFor(ScalaReflection.scala:848)
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:55)
at org.apache.spark.sql.Encoders$.product(Encoders.scala:312)
at org.apache.spark.sql.LowPrioritySQLImplicits.newProductEncoder(SQLImplicits.scala:302)
at org.apache.spark.sql.LowPrioritySQLImplicits.newProductEncoder$(SQLImplicits.scala:302)
at org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImplicits.scala:34)
at $linedb3da7b2933d4b63a62b3d2a21c675f2196.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$$$c35395a9233a7197629c985e87dce75$$$$w$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.$anonfun$udf1$1(command-2013905963200886:11)
型
在UDF中声明case类:
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
case class DataRow(field1: String)
val sparkSession = SparkSession.builder.getOrCreate()
import sparkSession.implicits._
val udf1 = udf((x: String) => {
case class DataRow(field1: String)
val testData = Seq(DataRow("test1"), DataRow("test2")).toDF("test") // This is failing at runtime
3
})
val df1 = Seq(DataRow("test1"), DataRow("test2")).toDF("test").withColumn("udf", udf1($"test")) // This is working
display(df1)
型
我得到一个不同的错误
error: value toDF is not a member of Seq[DataRow] val testData = Seq(DataRow("test1"), DataRow("test2")).toDF("test")
型
问题:
- 这是预期行为还是错误?
我还将this作为Apache Spark中的一个bug进行了剪切。
谢谢.
1条答案
按热度按时间h4cxqtbf1#
你不能在udf中创建数据集,SparkContext不存在于executors上。你可以得到一些行为,有时,在notebook或local中工作,但它不是你应该依赖的东西。此外,UDF的,像表达式一样不能返回dataset的,只有编码器支持的类型。
内置的隐式乘积编码器派生只适用于顶级类。从上面的代码中不清楚第一个例子中是否是这种情况,但第二个例子中肯定不是这种情况。