pyspark 获取group by中的第一个非空值(Spark 1.6)

xtfmy6hx  于 2023-01-12  发布在  Spark
关注(0)|答案(3)|浏览(113)

如何从group by中获取第一个非空值?我尝试将first与coalesceF.first(F.coalesce("code"))一起使用,但没有得到所需的行为(似乎得到了第一行)。

from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql import functions as F

sc = SparkContext("local")

sqlContext = SQLContext(sc)

df = sqlContext.createDataFrame([
    ("a", None, None),
    ("a", "code1", None),
    ("a", "code2", "name2"),
], ["id", "code", "name"])

我试过:

(df
  .groupby("id")
  .agg(F.first(F.coalesce("code")),
       F.first(F.coalesce("name")))
  .collect())

预期输出

[Row(id='a', code='code1', name='name2')]
uz75evzq

uz75evzq1#

对于Spark 1.3 - 1.5,这可以做到这一点:

from pyspark.sql import functions as F
df.groupBy(df['id']).agg(F.first(df['code']), F.first(df['name'])).show()

+---+-----------+-----------+
| id|FIRST(code)|FIRST(name)|
+---+-----------+-----------+
|  a|      code1|      name2|
+---+-----------+-----------+

编辑

显然,在1.6版本中,他们改变了first聚合函数的处理方式,现在,底层类First应该用第二个参数ignoreNullsExpr参数构造,first聚合函数还没有使用这个参数(如here所示)。然而,在Spark 2.0中,它将能够调用agg(F.first(col, True))来忽略空值(如here所示)。
因此,Spark 1.6的方法必须有所不同,而且效率更低。一个想法如下:

from pyspark.sql import functions as F
df1 = df.select('id', 'code').filter(df['code'].isNotNull()).groupBy(df['id']).agg(F.first(df['code']))
df2 = df.select('id', 'name').filter(df['name'].isNotNull()).groupBy(df['id']).agg(F.first(df['name']))
result = df1.join(df2, 'id')
result.show()

+---+-------------+-------------+
| id|first(code)()|first(name)()|
+---+-------------+-------------+
|  a|        code1|        name2|
+---+-------------+-------------+

也许有更好的选择。如果我找到了答案,我会修改的。

ftf50wuq

ftf50wuq2#

因为我对每个分组只有一个非空值,所以在1.6中使用min / max对我的目的是有效的:

(df
  .groupby("id")
  .agg(F.min("code"),
       F.min("name"))
  .show())

+---+---------+---------+
| id|min(code)|min(name)|
+---+---------+---------+
|  a|    code1|    name2|
+---+---------+---------+
vjhs03f7

vjhs03f73#

第一个方法接受参数ignorenulls,它可以被设置为真,
巨蟒:

df.groupby("id").agg(first(col("code"), ignorenulls=True).alias("code"))

斯卡拉:

df.groupBy("id").agg(first(col("code"), ignoreNulls = true).alias("code"))

相关问题