我正在尝试用scala编写udf并在pyspark中使用它。所以我写了一个简单的udf,它接受array<struct<dt:string,days:array>>并且必须返回数组。主要的问题是定义udf采用哪种类型的值。在这里:
class PackDataPeriod extends UDF3[Seq[Row], String, Int, Seq[Long]] {
override def call(data: Seq[Row], start_date: String, weeks: Int): Seq[Long] = {
val result = new Array[Long](weeks * 7)
result
}
}
我是这样使用的:
spark.udf.registerJavaFunction(
"PackDataPeriod", "ru.mail.dm.udfs.PackDataPeriod", ArrayType(LongType())
)
df.withColumn("data_", F.expr(f"PackDataPeriod(data, '2020-08-17', 2)"))
它失败了
Caused by: java.lang.ClassCastException: scala.collection.mutable.WrappedArray$ofRef cannot be cast to [J
at ru.mail.dm.udfs.PackDataPeriod$$anonfun$call$1$$anonfun$2.apply(PackDataPeriod.scala:16)
at ru.mail.dm.udfs.PackDataPeriod$$anonfun$call$1$$anonfun$2.apply(PackDataPeriod.scala:16)
at scala.Option.map(Option.scala:146)
at ru.mail.dm.udfs.PackDataPeriod$$anonfun$call$1.apply$mcVI$sp(PackDataPeriod.scala:16)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
at ru.mail.dm.udfs.PackDataPeriod.call(PackDataPeriod.scala:13)
at ru.mail.dm.udfs.PackDataPeriod.call(PackDataPeriod.scala:9)
at org.apache.spark.sql.UDFRegistration$$anonfun$263.apply(UDFRegistration.scala:789)
... 21 more
我试着把seq[row]改成array[row]等等。这没用(
我在本地测试了这个函数,它可以工作:
val schemaUntyped = new StructType()
.add("dt", StringType)
.add("ev_cnt", ArrayType(LongType))
val row = new GenericRowWithSchema(Array("2020-08-17", (0L to 6L).toArray),schema = schemaUntyped)
val pack = new PackDataPeriod()
println(pack.call(Array(row), "2020-08-17", 2))
WrappedArray(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)
暂无答案!
目前还没有任何答案,快来回答吧!