scala ProductEncoder在用户定义的文件夹(UDF)中未按预期工作

mbskvtky  于 6个月前  发布在  Scala
关注(0)|答案(1)|浏览(62)

在数据库笔记本中运行以下代码时:

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
  
  case class DataRow(field1: String)
  val sparkSession = SparkSession.builder.getOrCreate()
  import sparkSession.implicits._
  val udf1 = udf((x: String) => {
    val testData = Seq(DataRow("test1"), DataRow("test2")).toDF("test") // This is failing at runtime
    3
  })
  val df1 = Seq(DataRow("test1"), DataRow("test2")).toDF("test").withColumn("udf", udf1($"test")) // This is working
  display(df1)

字符串
我在运行时收到一个ScalaReflectionException:

ScalaReflectionException: class $linedb3da7b2933d4b63a62b3d2a21c675f2141.$read in JavaMirror with com.databricks.backend.daemon.driver.DriverLocal$DriverLocalClassLoader@717115ad of type class com.databricks.backend.daemon.driver.DriverLocal$DriverLocalClassLoader with classpath [] and parent being com.databricks.backend.daemon.driver.ClassLoaders$ReplWrappingClassLoader@1670897 of type class com.databricks.backend.daemon.driver.ClassLoaders$ReplWrappingClassLoader with classpath [<unknown>] and parent being com.databricks.backend.daemon.driver.ClassLoaders$LibraryClassLoader@1a42da0a of type class com.databricks.backend.daemon.driver.ClassLoaders$LibraryClassLoader with classpath [file:/local_disk0/tmp/repl/spark-4071811259476162981-8e526ae9-25fb-4545-8d3f-963a8661cd2b/] and parent being sun.misc.Launcher$AppClassLoader@43ee72e6 of type class sun.misc.Launcher$AppClassLoader with classpath [...] not found. 
at scala.reflect.internal.Mirrors$RootsBase.staticClass(Mirrors.scala:141) 
at scala.reflect.internal.Mirrors$RootsBase.staticClass(Mirrors.scala:29) 
at $linedb3da7b2933d4b63a62b3d2a21c675f2196.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$$$c35395a9233a7197629c985e87dce75$$$$w$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$typecreator6$1.apply(command-2013905963200886:11) 
at scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe$lzycompute(TypeTags.scala:237) 
at scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe(TypeTags.scala:237) 
at org.apache.spark.sql.catalyst.ScalaReflection$.encoderFor(ScalaReflection.scala:848) 
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:55) 
at org.apache.spark.sql.Encoders$.product(Encoders.scala:312) 
at org.apache.spark.sql.LowPrioritySQLImplicits.newProductEncoder(SQLImplicits.scala:302) 
at org.apache.spark.sql.LowPrioritySQLImplicits.newProductEncoder$(SQLImplicits.scala:302) 
at org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImplicits.scala:34) 
at $linedb3da7b2933d4b63a62b3d2a21c675f2196.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$$$c35395a9233a7197629c985e87dce75$$$$w$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.$anonfun$udf1$1(command-2013905963200886:11) 


在UDF中声明case类:

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
 
  case class DataRow(field1: String)
  val sparkSession = SparkSession.builder.getOrCreate()
  import sparkSession.implicits._  
  val udf1 = udf((x: String) => {
    case class DataRow(field1: String)
    val testData = Seq(DataRow("test1"), DataRow("test2")).toDF("test") // This is failing at runtime
    3
  })  
  val df1 = Seq(DataRow("test1"), DataRow("test2")).toDF("test").withColumn("udf", udf1($"test")) // This is working
  display(df1)


我得到一个不同的错误

error: value toDF is not a member of Seq[DataRow] val testData = Seq(DataRow("test1"), DataRow("test2")).toDF("test")


问题:

  • 这是预期行为还是错误?

我还将this作为Apache Spark中的一个bug进行了剪切。
谢谢.

h4cxqtbf

h4cxqtbf1#

你不能在udf中创建数据集,SparkContext不存在于executors上。你可以得到一些行为,有时,在notebook或local中工作,但它不是你应该依赖的东西。此外,UDF的,像表达式一样不能返回dataset的,只有编码器支持的类型。
内置的隐式乘积编码器派生只适用于顶级类。从上面的代码中不清楚第一个例子中是否是这种情况,但第二个例子中肯定不是这种情况。

相关问题