pyspark dataframe从一列字符串中提取每个不同的单词,并将它们放入一个新的dataframe中

6psbrbz9  于 2021-05-18  发布在  Spark
关注(0)|答案(1)|浏览(534)

我试图在pysparkDataframe中查找列中的所有字符串。
输入df:

id val 
 1  "book bike car"
 15 "car TV bike"

我需要一个像这样的输出df:(word\u index value是自动递增索引,“val\u new”中的值的顺序是随机的。)

val_new word_index
 TV      1
 car     2
 bike    3
 book    4

我的代码:

import pyspark.sql.functions as F
 from pyspark.sql.types import  ArrayType, StringType
 import re

 def my_f(col):
     if not col: 
         return 
     s = ''
     if isinstance(col, str):
         s = re.sub('[^a-zA-Z0-9]+', ' ', col).split()
     return s 

 my_udf = F.udf(my_f, ArrayType(StringType()))

 df = spark.createDataFrame([(1, 'book bike car'), (18, 'car TV bike')], ['id', 'val'])
 df = df.withColumn('val_new', my_udf(F.col('val')))

我已经将字符串转换为数组,但是如何从每行中提取单词、删除重复项,以及使用两个新列创建一个新的dataframe?
我不想使用groupby和aggregate,因为Dataframe可能很大,我不需要“id”列和任何重复的“val”。
谢谢

t8e9dugd

t8e9dugd1#

这可以是一个工作的解决方案,供您使用 spark 在构建函数中使用udf,这最终会使应用程序变慢。功能
explode() groupBy()collect_set() 会帮助你达到预期的效果。

在这里创建df

df = spark.createDataFrame([(1, 'book bike car'), (18, 'car TV bike')], ['id', 'val'])
df = df.withColumn("dummy_col", F.lit(1))
df.show()
+---+-------------+---------+
| id|          val|dummy_col|
+---+-------------+---------+
|  1|book bike car|        1|
| 18|  car TV bike|        1|
+---+-------------+---------+

逻辑在这里


# Add a dummy column to groupBy & in a single line

df = df.withColumn("array_col", F.split("val", " "))

# Collect_set will return you an array without duplicates

df_grp = df.groupBy("dummy_col").agg(F.collect_set("array_col").alias("array_col"))

# explode to transpoe the column

df_grp = df_grp.withColumn("explode_col", F.explode("array_col"))
df_grp = df_grp.withColumn("explode_col", F.explode("explode_col"))

# Distince to remove the duplicates

df_grp = df_grp.select("explode_col").distinct()

# another dummy column to create the row number

df_grp = df_grp.withColumn("dummy_col", F.lit("A"))
_w = W.partitionBy("dummy_col").orderBy("dummy_col")
df_grp = df_grp.withColumn("rnk", F.row_number().over(_w))
df_grp.show(truncate=False)

最终输出

+-----------+---------+---+
|explode_col|dummy_col|rnk|
+-----------+---------+---+
|TV         |A        |1  |
|car        |A        |2  |
|bike       |A        |3  |
|book       |A        |4  |
+-----------+---------+---+

相关问题