python—如何在pyspark中按列对Dataframe进行分组,并获得一个以该列为键、记录列表为值的字典?

4nkexdtk  于 2021-07-14  发布在  Java
关注(0)|答案(1)|浏览(288)

我有一个这样的Dataframe-

-RECORD 0-------------------------------------------

 id                          | 11           
 order_number                | 254                  
 order_date                  | 2021-03-09           
 store_id                    | abc6            
 employee_code               | 6921_abc40    
 customer_name               | harvey 
 contact_number              | 353          
 address                     | foo 
 locality                    | foo               
 postal_code                 | 5600082332             
 order_info                  | info
 amount                      | 478.8                
 payment_type                | null                 
 timeA                       | 2021-03-10 01:34:26
 timeB                       | 2021-03-10 01:35:26  

-RECORD 1-------------------------------------------

 id                          | 12            
 order_number                | 2272                 
 order_date                  | 2021-03-09           
 store_id                    | abc666             
 employee_code               | 66_abc55               
 customer_name               | mike        
 contact_number              | 98          
 address                     | bar
 locality                    | bar
 postal_code                 | 11000734332              
 order_info                  | info
 amount_to_be_collected      | 0.34                 
 payment_type                | null                 
 timeA                       | 2021-03-10 00:18:04  
 timeB                       | 2021-03-10 03:21:06

我想做以下事情-
按雇员代码对记录进行分组,得到一个字典作为回报-

{"emp_code": [Record0, Record1, ....]}

i、 例如,将雇员代码作为键,将该雇员的所有记录列表作为值。
我正在为此写一份工作。我可以通过编程方式通过循环遍历所有记录并获取所需的字典来实现这一点,但这将花费大量时间。我想知道是否有一种方法可以通过使用一些高阶pyspark函数来实现这个结果?

hgqdbh6s

hgqdbh6s1#

使用Map

您可以创建一个具有基于 employee_code 一个结构或数组的值为:

df = df.select(map(col("employee_code"), struct("order_number", "order_date",,)).alias("complex_map"))

然后可以使用 selectExpr 作为Map:

df.selectExpr("complex_map['employee_code']").show(2)

结构可选:

为此,您需要先对complextypes进行一些转换 grouping by 基本上把结构从某物转变为:

DataFrame[order_number: string, employee_code: string, ....]>

变成这样:

DataFrame[employee_code: string, complex: struct<order_number:string,contact_number:int>]>

可以使用 struct 函数来自 from pyspark.sql.functions import struct :

from pyspark.sql.functions import struct

df.select(col("employee_code"), struct("order_number", "order_date", ...).alias("orders"))

一旦在这种结构中有了它们,就可以执行group by并使用聚合函数collect\u list:

from pyspark.sql.functions import struct, collect_list

df.select(col("employee_code"), struct("order_number", "order_date", ...).alias("orders")).groupBy("employee_code").agg(collect_list("orders").alias("orders")

然后可以在结构中选择单个列,如下所示:

df.select(col("orders.order_number"))

甚至可以通过以下方式进行过滤:

df.select(col("employee_code")).where(col("orders.order_number") > 100)

如果你想回到原来的样子,看看 explode 函数,该函数接受一列数组并创建一行(其余值为重复值)

相关问题