pyspark收集列表

4xrmg8kj  于 2021-05-27  发布在  Spark
关注(0)|答案(2)|浏览(399)

我在pysparkDataframe中的一列上执行group by,并在另一列上执行collect list,以获取列1的所有可用值。如下所示。

Column_1 Column_2
A        Name1
A        Name2
A        Name3
B        Name1
B        Name2
C        Name1
D        Name1
D        Name1
D        Name1
D        Name1

我得到的输出是列2的collect列表,列1已分组。

Column_1 Column_2
A        [Name1,Name2,Name3]  
B        [Name1,Name2]
C        [Name1]
D        [Name1,Name1,Name1,Name1]

当collect列表中的所有值都相同时,我只想显示一次,而不是四次。下面是预期输出。
预期产量:

Column_1 Column_2
A        [Name1,Name2,Name3]  
B        [Name1,Name2]
C        [Name1]
D        [Name1]

有没有办法在Pypark中做到这一点?

yqlxgs2m

yqlxgs2m1#

使用 collect_set 消除重复并使用 array_sort (来自spark-2.4.0)对数组进行排序。
(或)使用 array_distinct (来自spark-2.4.0)以从collect\u列表中消除重复项。

df.show()

# +--------+--------+

# |Column_1|Column_2|

# +--------+--------+

# |       A|   Name1|

# |       A|   Name2|

# |       A|   Name3|

# |       B|   Name1|

# |       B|   Name2|

# |       C|   Name1|

# |       D|   Name1|

# |       D|   Name1|

# +--------+--------+

from pyspark.sql.functions import *    
df.groupBy('Column_1').agg(array_sort(collect_set(col('Column_2'))).alias("Column_2")).orderBy("Column_1").show(10,False)

# using array_distinct,array_sort functions

df.groupBy('Column_1').agg(array_sort(array_distinct(collect_list(col('Column_2')))).alias("Column_2")).orderBy("Column_1").show(10,False)

# +--------+---------------------+

# |Column_1|Column_2             |

# +--------+---------------------+

# |A       |[Name1, Name2, Name3]|

# |B       |[Name1, Name2]       |

# |C       |[Name1]              |

# |D       |[Name1]              |

# +--------+---------------------+
wb1gzix0

wb1gzix02#

除了我上面的评论之外,如果顺序与发生顺序有关(请检查输入df):

+--------+--------+
|Column_1|Column_2|
+--------+--------+
|A       |Name1   |
|A       |Name3   | <-Name3 occurs first
|A       |Name2   |
|B       |Name1   |
|B       |Name2   |
|C       |Name1   |
|D       |Name1   |
|D       |Name1   |
|D       |Name1   |
|D       |Name1   |
+--------+--------+

您可以首先分配索引和“删除重复项+收集”列表:

(df.withColumn("idx",F.monotonically_increasing_id()).dropDuplicates(["Column_1","Column_2"])
.orderBy("idx").groupby("Column_1").agg(F.collect_list("Column_2").alias("Column_2"))
 .orderBy("Column_1")).show(truncate=False)

+--------+---------------------+
|Column_1|Column_2             |
+--------+---------------------+
|A       |[Name1, Name3, Name2]|
|B       |[Name1, Name2]       |
|C       |[Name1]              |
|D       |[Name1]              |
+--------+---------------------+

相关问题