通过消除空值来合并apachespark中的行

rslzwgfq  于 2021-05-29  发布在  Spark
关注(0)|答案(2)|浏览(289)

我有一个SparkDataframe如下

+---+----+----+----+----+----+----+
| id|   1|   2|   3|sf_1|sf_2|sf_3|
+---+----+----+----+----+----+----+
|  2|null|null|null| 102| 202| 302|
|  4|null|null|null| 104| 204| 304|
|  1|null|null|null| 101| 201| 301|
|  3|null|null|null| 103| 203| 303|
|  1|  11|  21|  31|null|null|null|
|  2|  12|  22|  32|null|null|null|
|  4|  14|  24|  34|null|null|null|
|  3|  13|  23|  33|null|null|null|
+---+----+----+----+----+----+----+

我想通过合并空行来转换如下所示的Dataframe

+---+----+----+----+----+----+----+
| id|   1|   2|   3|sf_1|sf_2|sf_3|
+---+----+----+----+----+----+----+
|  1|  11|  21|  31| 101| 201| 301|
|  2|  12|  22|  32| 102| 202| 302|
|  4|  14|  24|  34| 104| 204| 304|
|  3|  13|  23|  33| 103| 203| 303|
+---+----+----+----+----+----+----+

最好是在斯卡拉。

sulc1iza

sulc1iza1#

这是一种做事的方式。

val inputColumns = inputLoadDF.columns.toList.drop(0)
val exprs = inputColumns.map(x => first(x,true))
inputLoadDF.groupBy("id").agg(exprs.head,exprs.tail:_*).show()
sg2wtvxw

sg2wtvxw2#

你可以分组 id 和骨料使用 firstignorenulls 对于其他列:

import pyspark.sql.functions as F

(df.groupBy('id').agg(*[F.first(x,ignorenulls=True) for x in df.columns if x!='id'])
.show())
+---+----+----+----+-----+-----+-----+
| id|   1|   2|   3| sf_1| sf_2| sf_3|
+---+----+----+----+-----+-----+-----+
|  1|11.0|21.0|31.0|101.0|201.0|301.0|
|  3|13.0|23.0|33.0|103.0|203.0|303.0|
|  2|12.0|22.0|32.0|102.0|202.0|302.0|
|  4|14.0|24.0|34.0|104.0|204.0|304.0|
+---+----+----+----+-----+-----+-----+

相关问题