为什么spark(scalaapi)agg函数采用expr和exprs参数?

laik7k3q  于 2021-05-27  发布在  Spark
关注(0)|答案(2)|浏览(612)

Sparkapi RelationalGroupedDataset 有一个功能 agg :

@scala.annotation.varargs
def agg(expr: Column, exprs: Column*): DataFrame = {
  toDF((expr +: exprs).map {
    case typed: TypedColumn[_, _] =>
      typed.withInputType(df.exprEnc, df.logicalPlan.output).expr
    case c => c.expr
  })
}

为什么需要两个独立的论点?为什么不能 exprs: Column* ? 有人有一个隐式函数只接受一个参数吗?

qyswt5oh

qyswt5oh1#

我试着想象它将如何使用 cats.data.NonEmptyList (需要 cats-core 附属国: libraryDependencies += "org.typelevel" %% "cats-core" % "2.1.1" ):

import cats.data.NonEmptyList

implicit class RelationalGroupedDatasetOps(
  private val rgd: RelationalGroupedDataset
) {
  def aggOnNonEmpty(nonEmptyColumns: NonEmptyList[Column]): DataFrame =
    rgd.agg(nonEmptyColumns.head, nonEmptyColumns.tail:_*)

  def aggUnsafe(columnList: List[Column]): DataFrame = {
    val nonEmptyColumns = NonEmptyList.fromListUnsafe(columnList)
    rgd.agg(nonEmptyColumns.head, nonEmptyColumns.tail:_*)
  }
}

对于scala 2.12,使用标准库 List :

implicit class RelationalGroupedDatasetOps(
  private val rgd: RelationalGroupedDataset
) {
  def aggUnsafe(aggColumns: List[Column]): DataFrame =
    aggColumns match {
      case ::(head, tail) => rgd.agg(head, tail:_*) 
      case Nil => throw new IllegalArgumentException(
        "aggColumns parameter can not be empty for aggregation"
      )
    }
}

使用示例:

import Implicits.RelationalGroupedDatasetOps

// some data with columns id, category(int), amount(double)
val df: DataFrame = ???
df.groupBy("id")
  .aggUnsafe(
    df.columns.filter(c => c != "id").map(c => sum(c))
  ) // returns aggregated DataFrame
3z6pesqy

3z6pesqy2#

这是为了确保至少指定一个参数。
纯varargs不能做到这一点,您可以在没有任何参数的情况下调用该方法。

相关问题