pyspark在pivot之后的agg()中使用alias函数时的奇怪行为

xiozqbni  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(872)

在旋转后使用alias函数重命名agg()中的列时,我遇到了一个奇怪的行为。此代码适用于:

tst = sqlContext.createDataFrame([(1,2,3,4),(3,2,5,4),(5,3,7,5),(7,3,9,5)],schema=['col1','col2','col3','col4'])
chk= tst.groupby('col1').pivot('col2').agg(F.sum('col3').alias('sum'),F.mean('col3').alias('mean'))

当我检查这个df的列时,名称和预期的一样

chk.columns
Out[54]: ['col1', '2_sum', '2_mean', '3_sum', '3_mean']

但是当我只有一个聚合,后跟pivot时,重命名就不起作用了。

import pyspark.sql.functions as F

# Test data

tst = sqlContext.createDataFrame([(1,2,3,4),(3,2,5,4),(5,3,7,5),(7,3,9,5)],schema=['col1','col2','col3','col4'])
chk= tst.groupby('col1').pivot('col2').agg(F.sum('col3').alias('sum'))

现在,当我检查列结果时,重命名不起作用

chk.columns
Out[56]: ['col1', '2', '3']

这是spark的预期行为吗?我错过什么了吗?

tpgth1q7

tpgth1q71#

您可能想看看pivot的spark git源代码

override def output: Seq[Attribute] = {
    val pivotAgg = aggregates match {
      case agg :: Nil =>
        pivotValues.map(value => AttributeReference(value.toString, agg.dataType)())
      case _ =>
        pivotValues.flatMap { value =>
          aggregates.map(agg => AttributeReference(value + "_" + agg.sql, agg.dataType)())
        }
    }
    groupByExprsOpt.getOrElse(Seq.empty).map(_.toAttribute) ++ pivotAgg
  }

您可以观察到输出列没有被 agg.sql 当pivot后面只有一个agggate表达式时
当存在单个聚合表达式时,将提供output属性 pivot value 作为名称-

pivotValues.map(value => AttributeReference(value.toString, agg.dataType)())

结论-这种行为是意料之中的,并不奇怪。

相关问题