pyspark-preserve collect list和collect set在多列上的顺序

sqougxex  于 2021-05-27  发布在  Spark
关注(0)|答案(4)|浏览(954)

我有下面的pysparkDataframe。

Column_1 Column_2 Column_3 Column_4
1        A        U1       12345
1        A        A1       549BZ4G

预期产量:
按第1列和第2列分组。收集集合第3列和第4列,同时保留输入Dataframe中的顺序。它应该与输入的顺序相同。第3列和第4列之间的排序没有依赖关系。两者都必须保留输入Dataframe顺序

Column_1 Column_2 Column_3 Column_4
1        A        U1,A1    12345,549BZ4G

到目前为止我尝试的是:
我第一次尝试使用窗口方法。其中,我按第1列和第2列划分,按第1列和第2列排序。然后我按第1列和第2列进行分组,并在第3列和第4列进行收集。
我没有得到预期的结果。我的结果如下。

Column_1 Column_2 Column_3 Column_4
1        A        U1,A1    549BZ4G,12345

我还尝试使用单调递增的id来创建索引,然后按索引排序,然后执行group by和collect set来获得输出。但还是没有运气。
是因为字母数字和数字值吗?如何保持第3列和第4列在输入中的顺序不变。

kqlmhetl

kqlmhetl1#

spark中的所有collect函数(collect\u set、collect\u list)都是不确定的,因为收集结果的顺序取决于底层Dataframe中的行的顺序,这也是不确定的。
因此得出结论,使用spark-collect函数不能真正保持顺序
参考-功能.scala spark repo

bwleehnv

bwleehnv2#

我不知道为什么它没有显示您正确的结果,对我来说,这是按照预期的结果-
输入

df_a = spark.createDataFrame([(1,'A','U1','12345'),(1,'A','A1','549BZ4G')],[ "col_1","col_2","col_3","col_4"])

逻辑

from pyspark.sql import functions as F
df_a = df_a.groupBy('col_1','col_2').agg(F.collect_list('col_3').alias('col_3'), F.collect_list('col_4').alias('col_4'))

输出

df_a.show()
+-----+-----+--------+----------------+
|col_1|col_2|   col_3|           col_4|
+-----+-----+--------+----------------+
|    1|    A|[U1, A1]|[12345, 549BZ4G]|
+-----+-----+--------+----------------+
r7s23pms

r7s23pms3#

使用struct怎么样?

val result = df.groupBy(
  "Column_1", "Column_2"
).agg(
  collect_list(
    struct(col("Column_3"), col("Column_4"))
  ).alias("coll")
).select(
  col("Column_1"), col("Column_2"), col("coll.Column_3"), col("coll.Column_4")
)

这应该会产生预期的结果。诀窍是struct保留命名元素,以便您可以通过引用它们。存取器。它也适用于arraytype(structtype)。
通过struct对并置的概念进行分组感觉很自然,因为这是您试图在这里保留的结构关系。
理论上,您甚至可能不想解压结构,因为这些值似乎有依赖关系。

ahy6op9u

ahy6op9u4#

使用 monotically_increasing_id spark的功能来维持订单。你可以在这里找到更多信息


# InputDF

    # +----+----+----+-------+
    # |col1|col2|col3|   col4|
    # +----+----+----+-------+
    # |   1|   A|  U1|  12345|
    # |   1|   A|  A1|549BZ4G|
    # +----+----+----+-------+

    df1 = df.withColumn("id", F.monotonically_increasing_id()).groupby("Col1", "col2").agg(F.collect_list("col4").alias("Col4"),F.collect_list("col3").alias("Col3"))

    df1.select("col1", "col2",F.array_join("col3", ",").alias("col3"),F.array_join("col4", ",").alias("col4")).show()

    # OutputDF
    # +----+----+-----+-------------+
    # |col1|col2| col3|         col4|
    # +----+----+-----+-------------+
    # |   1|   A|U1,A1|12345,549BZ4G|
    # +----+----+-----+-------------+

使用 array_distinct 在…之上 collect_list 有不同的价值观并维持秩序。


# InputDF

    # +----+----+----+-------+
    # |col1|col2|col3|   col4|
    # +----+----+----+-------+
    # |   1|   A|  U1|  12345|
    # |   1|   A|  A1|549BZ4G|
    # |   1|   A|  U1|123456 |
    # +----+----+----+-------+

    df1 = df.withColumn("id", F.monotonically_increasing_id()).groupby("Col1", "col2").agg(
        F.array_distinct(F.collect_list("col4")).alias("Col4"),F.array_distinct(F.collect_list("col3")).alias("Col3"))

    df1.select("col1", "col2", F.array_join("col3", ",").alias("col3"), F.array_join("col4", ",").alias("col4")).show(truncate=False)

    # +----+----+-----+---------------------+
    # |col1|col2|col3 |col4                 |
    # +----+----+-----+---------------------+
    # |1   |A   |U1,A1|12345,549BZ4G,123456 |
    # +----+----+-----+---------------------+

相关问题