我正在将一些Scala Spark UDAF从UserDefinedAggregateFunction迁移到Aggregator。其中一个将Array[String]作为输入,当我在本地测试中执行Aggregator时,我得到了一个奇怪的异常。我已经将代码减少到一个非常基本的Aggregator,但当我阅读Arrays时,我仍然得到这个错误。我对其他输入类型没有问题。
简单的例子如下:
class ArrayInputAggregator extends Aggregator[Array[String], Int, Int] with Serializable {
override def zero = {0}
override def reduce(buffer: Int, newItem: Array[String]): Int = {
buffer + newItem.length
}
override def merge(b1: Int, b2: Int): Int = {
b1 + b2
}
override def finish(reduction: Int): Int = reduction
def bufferEncoder: Encoder[Int] = Encoders.scalaInt
def outputEncoder: Encoder[Int] = Encoders.scalaInt
}
}
字符串
我用这段代码测试它:
val test = udaf(new ArrayInputAggregator())
val d = spark
.sql("select array('asd','tre','asd') arr")
.groupBy()
.agg(test($"arr").as("cnt"))
d.show
型
这是我得到的例外:
2023-12-24 12:06:24,678错误spark.executor.Executor -在阶段0.0中的任务0.0中出现异常(TID 0)java.lang.NullPointerException:null at org.apache.spark.sql. catalytic.expressions.objects.MapObjects$.apply(objects.scala:682)~[spark-catalytic_2.12-3.0.0.jar:3.0.0] at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveValidizer$$anonfun$apply$31$$anonfun$applyOrElse$172$anonfun$10.applyOrElse(Analyzer.scala:3033)~[spark-catalytic_2.12-3.0.0.jar:3.0.0],位于org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveValidizer$$anonfun$apply$31$$anonfun$applyOrElse$172$anonfun$10.applyOrElse(Analyzer.scala:3029)~[spark-catalyst_2.12-3.0.0.jar:3.0.0] at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$1(TreeNode.scala:309)~[spark-catalytic_2.12-3.0.0.jar:3.0.0] at org.apache.spark.sql. catalytic.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72)~[spark-catalytic_2.12-3.0.0.jar:3.0.0] at org.apache.spark.sql. catalytic.trees.TreeNode.transformDown(TreeNode.scala:309)~[spark-catalyst_2.12-3.0.0.jar:3.0.0] at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$3(TreeNode.scala:314)~[spark-catalyst_2.12-3.0.0.jar:3.0.0] at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:399)~[spark-catalytic_2.12-3.0.0.jar:3.0.0] at org.apache.spark.sql. catalytic.treeNode.mapProductIterator(TreeNode.scala:237)~[spark-catalytic_2.12-3.0.0.jar:3.0.0] at org.apache.spark.sql. catalytic.trees.TreeNode.mapChildren(TreeNode.scala:397)~[spark-catalytic_2.12-3.0.0.jar:3.0.0] at org.apache.spark.sql. catalytic.trees.TreeNode.mapChildren(TreeNode.scala:350)~[spark-catalytic_2.12-3.0.0.jar:3.0.0] at org.apache.spark.sql. catalytic.trees.TreeNode.transformDown(TreeNode.scala:314)~[spark-catalytic_2.12-3.0.0.jar:3.0.0],位于org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveValidizer$$anonfun$apply$31$$anonfun$applyOrElse$172.applyOrElse(Analyzer.scala:3029)~[spark-catalytic_2.12-3.0.0.jar:3.0.0],位于org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveValidizer$$anonfun$apply$31$$anonfun$applyOrElse$172.applyOrElse(Analyzer.scala:3018)~[spark-catalyst_2.12-3.0.0.jar:3.0.0] at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$1(TreeNode.scala:309)~[spark-catalytic_2.12-3.0.0.jar:3.0.0] at org.apache.spark.sql. catalytic.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72)~[spark-catalytic_2.12-3.0.0.jar:3.0.0] at org.apache.spark.sql. catalytic.trees.TreeNode.transformDown(TreeNode.scala:309)~[spark-catalytic_2.12-3.0.0.jar:3.0.0]在org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$transformExpressionsDown$1(QueryPlan.scala:96)~[spark-catalytic_2.12-3.0.0.jar:3.0.0] at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$1(QueryPlan.scala:118)~[spark-catalytic_2.12-3.0.0.jar:3.0.0] at org.apache.spark.sql. catalytic.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72)~[spark-catalyst_2.12-3.0.0.jar:3.0.0] at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:118)~[spark-catalyst_2.12-3.0.0.jar:3.0.0] at org.apache.spark.sql.catalyst.plans.QueryPlan.recursiveTransform$1(QueryPlan.scala:129)~[spark-catalytic_2.12-3.0.0.jar:3.0.0] at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$4(QueryPlan.scala:139)~[spark-catalytic_2.12-3.0.0.jar:3.0.0] at org.apache.spark.sql. catalytic.trees.TreeNode.mapProductIterator(TreeNode.scala:237)~[spark-catalytic_2.12-3.0.0.jar:3.0.0] at org.apache.spark.sql. catalytic.plans.QueryPlan.mapExpressions(QueryPlan.scala:139)~[spark-catalytic_2.12-3.0.0.jar:3.0.0] at org.apache.spark.sql. catalytic.plans.QueryPlan.transformExpressionsDown(QueryPlan.scala:96)
(it继续下去)
1条答案
按热度按时间i7uaboj41#
这个bug已经在Spark 3.0.1及更高版本中修复了。