scala 使用数组作为输入的Spark聚合器

f3temu5u  于 5个月前  发布在  Scala
关注(0)|答案(1)|浏览(51)

我正在将一些Scala Spark UDAF从UserDefinedAggregateFunction迁移到Aggregator。其中一个将Array[String]作为输入,当我在本地测试中执行Aggregator时,我得到了一个奇怪的异常。我已经将代码减少到一个非常基本的Aggregator,但当我阅读Arrays时,我仍然得到这个错误。我对其他输入类型没有问题。
简单的例子如下:

class ArrayInputAggregator extends Aggregator[Array[String], Int, Int] with Serializable {

    override def zero = {0}

    override def reduce(buffer: Int, newItem: Array[String]): Int = {
      buffer + newItem.length
    }

    override def merge(b1: Int, b2: Int): Int = {
      b1 + b2
    }

    override def finish(reduction: Int): Int = reduction
    def bufferEncoder: Encoder[Int] = Encoders.scalaInt
    def outputEncoder: Encoder[Int] = Encoders.scalaInt
  }
}

字符串
我用这段代码测试它:

val test = udaf(new ArrayInputAggregator())

val d = spark
      .sql("select array('asd','tre','asd') arr")
      .groupBy()
      .agg(test($"arr").as("cnt"))

d.show


这是我得到的例外:
2023-12-24 12:06:24,678错误spark.executor.Executor -在阶段0.0中的任务0.0中出现异常(TID 0)java.lang.NullPointerException:null at org.apache.spark.sql. catalytic.expressions.objects.MapObjects$.apply(objects.scala:682)~[spark-catalytic_2.12-3.0.0.jar:3.0.0] at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveValidizer$$anonfun$apply$31$$anonfun$applyOrElse$172$anonfun$10.applyOrElse(Analyzer.scala:3033)~[spark-catalytic_2.12-3.0.0.jar:3.0.0],位于org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveValidizer$$anonfun$apply$31$$anonfun$applyOrElse$172$anonfun$10.applyOrElse(Analyzer.scala:3029)~[spark-catalyst_2.12-3.0.0.jar:3.0.0] at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$1(TreeNode.scala:309)~[spark-catalytic_2.12-3.0.0.jar:3.0.0] at org.apache.spark.sql. catalytic.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72)~[spark-catalytic_2.12-3.0.0.jar:3.0.0] at org.apache.spark.sql. catalytic.trees.TreeNode.transformDown(TreeNode.scala:309)~[spark-catalyst_2.12-3.0.0.jar:3.0.0] at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$3(TreeNode.scala:314)~[spark-catalyst_2.12-3.0.0.jar:3.0.0] at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:399)~[spark-catalytic_2.12-3.0.0.jar:3.0.0] at org.apache.spark.sql. catalytic.treeNode.mapProductIterator(TreeNode.scala:237)~[spark-catalytic_2.12-3.0.0.jar:3.0.0] at org.apache.spark.sql. catalytic.trees.TreeNode.mapChildren(TreeNode.scala:397)~[spark-catalytic_2.12-3.0.0.jar:3.0.0] at org.apache.spark.sql. catalytic.trees.TreeNode.mapChildren(TreeNode.scala:350)~[spark-catalytic_2.12-3.0.0.jar:3.0.0] at org.apache.spark.sql. catalytic.trees.TreeNode.transformDown(TreeNode.scala:314)~[spark-catalytic_2.12-3.0.0.jar:3.0.0],位于org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveValidizer$$anonfun$apply$31$$anonfun$applyOrElse$172.applyOrElse(Analyzer.scala:3029)~[spark-catalytic_2.12-3.0.0.jar:3.0.0],位于org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveValidizer$$anonfun$apply$31$$anonfun$applyOrElse$172.applyOrElse(Analyzer.scala:3018)~[spark-catalyst_2.12-3.0.0.jar:3.0.0] at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$1(TreeNode.scala:309)~[spark-catalytic_2.12-3.0.0.jar:3.0.0] at org.apache.spark.sql. catalytic.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72)~[spark-catalytic_2.12-3.0.0.jar:3.0.0] at org.apache.spark.sql. catalytic.trees.TreeNode.transformDown(TreeNode.scala:309)~[spark-catalytic_2.12-3.0.0.jar:3.0.0]在org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$transformExpressionsDown$1(QueryPlan.scala:96)~[spark-catalytic_2.12-3.0.0.jar:3.0.0] at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$1(QueryPlan.scala:118)~[spark-catalytic_2.12-3.0.0.jar:3.0.0] at org.apache.spark.sql. catalytic.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72)~[spark-catalyst_2.12-3.0.0.jar:3.0.0] at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:118)~[spark-catalyst_2.12-3.0.0.jar:3.0.0] at org.apache.spark.sql.catalyst.plans.QueryPlan.recursiveTransform$1(QueryPlan.scala:129)~[spark-catalytic_2.12-3.0.0.jar:3.0.0] at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$4(QueryPlan.scala:139)~[spark-catalytic_2.12-3.0.0.jar:3.0.0] at org.apache.spark.sql. catalytic.trees.TreeNode.mapProductIterator(TreeNode.scala:237)~[spark-catalytic_2.12-3.0.0.jar:3.0.0] at org.apache.spark.sql. catalytic.plans.QueryPlan.mapExpressions(QueryPlan.scala:139)~[spark-catalytic_2.12-3.0.0.jar:3.0.0] at org.apache.spark.sql. catalytic.plans.QueryPlan.transformExpressionsDown(QueryPlan.scala:96)
(it继续下去)

i7uaboj4

i7uaboj41#

这个bug已经在Spark 3.0.1及更高版本中修复了。

相关问题