我试图在pysparkDataframe中查找列中的所有字符串。
输入df:
id val
1 "book bike car"
15 "car TV bike"
我需要一个像这样的输出df:(word\u index value是自动递增索引,“val\u new”中的值的顺序是随机的。)
val_new word_index
TV 1
car 2
bike 3
book 4
我的代码:
import pyspark.sql.functions as F
from pyspark.sql.types import ArrayType, StringType
import re
def my_f(col):
if not col:
return
s = ''
if isinstance(col, str):
s = re.sub('[^a-zA-Z0-9]+', ' ', col).split()
return s
my_udf = F.udf(my_f, ArrayType(StringType()))
df = spark.createDataFrame([(1, 'book bike car'), (18, 'car TV bike')], ['id', 'val'])
df = df.withColumn('val_new', my_udf(F.col('val')))
我已经将字符串转换为数组,但是如何从每行中提取单词、删除重复项,以及使用两个新列创建一个新的dataframe?
我不想使用groupby和aggregate,因为Dataframe可能很大,我不需要“id”列和任何重复的“val”。
谢谢
1条答案
按热度按时间t8e9dugd1#
这可以是一个工作的解决方案,供您使用
spark
在构建函数中使用udf,这最终会使应用程序变慢。功能explode()
groupBy()
与collect_set()
会帮助你达到预期的效果。在这里创建df
逻辑在这里
最终输出