sparksql组,map-reduce

fykwrbwg  于 2021-07-13  发布在  Spark
关注(0)|答案(2)|浏览(252)

我有以下名为“data”的数据集:

+---------+-------------+------+
|   name  |      subject| mark |
+---------+-------------+------+
|     Anna|         math|    80|
|     Vlad|      history|    67|
|     Jack|          art|    78|
|    David|         math|    71|
|   Monica|          art|    65|
|     Alex|          lit|    59|
|     Mark|         math|    82|
+---------+-------------+------+

我想做一个Map缩小的工作。
结果如下:

Anna, David : 1
Anna, Mark : 1
David, mark: 1
Vlad, None : 1
Jack, Monica: 1
Alex, None : 1

我试着做到以下几点:

new_data = data.select(['name', 'subject']).show()

+---------+-------------+
|   name  |      subject| 
+---------+-------------+
|     Anna|         math|  
|     Vlad|      history|  
|     Jack|          art|   
|    David|         math|   
|   Monica|          art|    
|     Alex|          lit|    
|     Mark|         math|    
+---------+-------------+

data_new.groupBy('name','subject').count().show(10)

然而,这个命令并没有给出我所需要的。

zbdgwd5y

zbdgwd5y1#

您可以使用subject进行自左连接,获得不同的对,并添加一列 1 .

import pyspark.sql.functions as F

result = df.alias('t1').join(df.alias('t2'),
    F.expr("t1.subject = t2.subject and t1.name != t2.name"), 
    'left'
).select(
    F.concat_ws(
        ', ',
        F.greatest('t1.name', F.coalesce('t2.name', F.lit('None'))),
        F.least('t1.name', F.coalesce('t2.name', F.lit('None')))
    ).alias('pair')
).distinct().withColumn('val', F.lit(1))

result.show()
+------------+---+
|        pair|val|
+------------+---+
|  Alex, None|  1|
| Anna, David|  1|
|  Anna, Mark|  1|
|  None, Vlad|  1|
| David, Mark|  1|
|Jack, Monica|  1|
+------------+---+
iqjalb3h

iqjalb3h2#

过程可以是:
在数组中对具有相同主题的学生进行分组
打电话给 udf 函数创建数组项排列
添加一列,显示每个主题的数字
呼叫 explode 函数为数组中的每个项分别创建3行
让我们一步一步地做:第一步:分组

import pyspark.sql.functions as F

grouped_df = data_new.groupBy('subject').agg(F.collect_set('name').alias('students_array'))

第二步: udf 功能

from itertools import permutations
def permutatoin(df_col):
    result = sorted([e for e in set(permutations(df_col))])
    return result 
spark.udf.register("perWithPython", permutatoin)
grouped_df  = grouped_df.select('*', permutatoin('students_array'))

步骤3:为每个主题创建一个新的数字值列

grouped_df = grouped_df .withColumn('subject_no', F.rowNumber().over(Window.partitionBy('subject'))

步骤4:创建单独的行

grouped_df.select(grouped_df.subject_no, explode(grouped_df.students_array)).show(truncate=False)

相关问题