在Scala Spark中使用withColumn和UDF高效地向DataFrame添加多个列

5anewei6  于 8个月前  发布在  Scala
关注(0)|答案(2)|浏览(130)

我有一个DataFrame,它有三列:Data、Col1和Col2。我想使用scala中的withColumn函数,根据以下逻辑创建300个额外的列:

val outDataFrame = inputDataFrame;
var i = 0;
while (i < myObjList.size) {
    val x = myObjList.get(i)
    val stInd = x.getStartSplitIndex + 1
    val length = x.getEndSplitIndex - x.getStartSplitIndex
    val convertDataType = x.getDataType()
    val outColName = x.getName()

    // The following line is not working
    outDataFrame = outDataFrame.withColumn(outColName,col(myFunc(convertDataType,substring(col("Data"),stInd,length).toString())))

    i += 1
}

在此代码片段中,myObjList是一个长度为300的对象列表,每个对象表示创建新列的规范。myFunc函数执行数据转换,接受两个参数:convertDataType(字符串)和dataToConvert(字符串)。
stIndlength的值是从当前对象x派生的,提供了关于我应该使用Data列的哪个切片来生成相应的新列的信息。
我在寻求一个最佳方法来完成这项任务。输入DataFrame包含 * 亿 * 条记录,因此 * 性能优化至关重要 *。
谢谢

ou6hu8tu

ou6hu8tu1#

.withColumn返回一个 new restrame,你基本上放弃了它,并继续向原始的restrame添加列(并放弃)。
您应该努力编写更符合习惯的代码,这将有助于避免类似的问题以及未来的许多其他问题。

val newDF = myObjList.foldLeft(oldDF) { case (df, x) => 
 val start = x.getStartSplitIndex + 1
 val length = x.getEndSplitIndex - start + 1
 df.withColumn(
   x.getName, 
   col(
     myFunc(x.getDataType, substring(col("Data"), start, length))
   )
 )
}
lrpiutwd

lrpiutwd2#

不要使用withColumn,而是使用select和您在单个投影中构建的Seq[Column]。Spark并不总是优化预测,这可能会导致严重的性能损失。
myFunc尽可能使用内置的Spark函数,UDF明显较慢。如果你必须在做自定义转换逻辑时挤出每一滴性能,然后直接使用表达式并实现代码生成,但要注意接口是Spark内部的,并且可以在每个版本中更改。
它可以像这样简单(取自Quality):

import org.apache.spark.sql.Column
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
import org.apache.spark.sql.catalyst.expressions.{BinaryExpression, Expression}
import org.apache.spark.sql.types.{DataType, StringType}
import org.apache.spark.unsafe.types.UTF8String

object AsUUID {
  def apply(lower: Column, higher: Column): Column =
    new Column(AsUUID(lower.expr, higher.expr))
}

/**
 * Converts a lower and higher pair of longs into a uuid string
 * @param left
 * @param right
 */
case class AsUUID(left: Expression, right: Expression) extends BinaryExpression {
  protected def withNewChildrenInternal(newLeft: Expression, newRight: Expression): Expression = copy(left = newLeft, right = newRight)

  override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode =
    defineCodeGen(ctx, ev, (lower, higher) => s"org.apache.spark.unsafe.types.UTF8String.fromString(new java.util.UUID($higher, $lower).toString())")

  override def dataType: DataType = StringType

  override protected def nullSafeEval(lower: Any, higher: Any): Any =
    UTF8String.fromString(new java.util.UUID(higher.asInstanceOf[Long], lower.asInstanceOf[Long]).toString)
}

允许您用途:

df.select(AsUUID(col("lower"), col("higher")))

但这段代码将与其他Spark代码生成器一起执行,并且不会因为转换为Scala类型并通过编码器再次转换回Catalyst而导致性能损失。只有当它能带来显著的节省时才走这条路,例如。如果对于10亿行,使用UDF需要1小时,使用表达式需要40m,那么节省20m可能是值得的(示例数字,但UDF通常慢2倍)。使用内部API的学习曲线和障碍可能不值得在性能上获得一点小的提高,除非您将在其他地方使用这些表达式。

相关问题