wrappedarray$ofref不能在scala udf中转换为[j]

p4rjhz4m  于 2021-05-27  发布在  Spark
关注(0)|答案(0)|浏览(434)

我正在尝试用scala编写udf并在pyspark中使用它。所以我写了一个简单的udf,它接受array<struct<dt:string,days:array>>并且必须返回数组。主要的问题是定义udf采用哪种类型的值。在这里:

class PackDataPeriod extends UDF3[Seq[Row], String, Int,  Seq[Long]] {
  override def call(data: Seq[Row], start_date: String, weeks: Int):  Seq[Long] = {
    val result = new Array[Long](weeks * 7)
    result
  }
}

我是这样使用的:

spark.udf.registerJavaFunction(
    "PackDataPeriod", "ru.mail.dm.udfs.PackDataPeriod", ArrayType(LongType())
)
df.withColumn("data_", F.expr(f"PackDataPeriod(data, '2020-08-17', 2)"))

它失败了

Caused by: java.lang.ClassCastException: scala.collection.mutable.WrappedArray$ofRef cannot be cast to [J
    at ru.mail.dm.udfs.PackDataPeriod$$anonfun$call$1$$anonfun$2.apply(PackDataPeriod.scala:16)
    at ru.mail.dm.udfs.PackDataPeriod$$anonfun$call$1$$anonfun$2.apply(PackDataPeriod.scala:16)
    at scala.Option.map(Option.scala:146)
    at ru.mail.dm.udfs.PackDataPeriod$$anonfun$call$1.apply$mcVI$sp(PackDataPeriod.scala:16)
    at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
    at ru.mail.dm.udfs.PackDataPeriod.call(PackDataPeriod.scala:13)
    at ru.mail.dm.udfs.PackDataPeriod.call(PackDataPeriod.scala:9)
    at org.apache.spark.sql.UDFRegistration$$anonfun$263.apply(UDFRegistration.scala:789)
    ... 21 more

我试着把seq[row]改成array[row]等等。这没用(
我在本地测试了这个函数,它可以工作:

val schemaUntyped = new StructType()
      .add("dt", StringType)
      .add("ev_cnt", ArrayType(LongType))

      val row = new GenericRowWithSchema(Array("2020-08-17", (0L to 6L).toArray),schema = schemaUntyped)
      val pack = new PackDataPeriod()
      println(pack.call(Array(row), "2020-08-17", 2))
WrappedArray(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题