我有一个DataFrame,它有三列:Data、Col1和Col2。我想使用scala中的withColumn函数,根据以下逻辑创建300个额外的列:
val outDataFrame = inputDataFrame;
var i = 0;
while (i < myObjList.size) {
val x = myObjList.get(i)
val stInd = x.getStartSplitIndex + 1
val length = x.getEndSplitIndex - x.getStartSplitIndex
val convertDataType = x.getDataType()
val outColName = x.getName()
// The following line is not working
outDataFrame = outDataFrame.withColumn(outColName,col(myFunc(convertDataType,substring(col("Data"),stInd,length).toString())))
i += 1
}
在此代码片段中,myObjList
是一个长度为300的对象列表,每个对象表示创建新列的规范。myFunc
函数执行数据转换,接受两个参数:convertDataType
(字符串)和dataToConvert
(字符串)。stInd
和length
的值是从当前对象x
派生的,提供了关于我应该使用Data列的哪个切片来生成相应的新列的信息。
我在寻求一个最佳方法来完成这项任务。输入DataFrame包含 * 亿 * 条记录,因此 * 性能优化至关重要 *。
谢谢
2条答案
按热度按时间ou6hu8tu1#
.withColumn
返回一个 new restrame,你基本上放弃了它,并继续向原始的restrame添加列(并放弃)。您应该努力编写更符合习惯的代码,这将有助于避免类似的问题以及未来的许多其他问题。
lrpiutwd2#
不要使用withColumn,而是使用select和您在单个投影中构建的Seq[Column]。Spark并不总是优化预测,这可能会导致严重的性能损失。
myFunc尽可能使用内置的Spark函数,UDF明显较慢。如果你必须在做自定义转换逻辑时挤出每一滴性能,然后直接使用表达式并实现代码生成,但要注意接口是Spark内部的,并且可以在每个版本中更改。
它可以像这样简单(取自Quality):
允许您用途:
但这段代码将与其他Spark代码生成器一起执行,并且不会因为转换为Scala类型并通过编码器再次转换回Catalyst而导致性能损失。只有当它能带来显著的节省时才走这条路,例如。如果对于10亿行,使用UDF需要1小时,使用表达式需要40m,那么节省20m可能是值得的(示例数字,但UDF通常慢2倍)。使用内部API的学习曲线和障碍可能不值得在性能上获得一点小的提高,除非您将在其他地方使用这些表达式。