在pyspark上将一列拆分为更多列时出现问题

2cmtqfgy  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(557)

在pyspark中将Dataframe的列拆分为更多列时遇到问题:
我有一个列表列表,我想把它转换成一个Dataframe,每个值在一列中。
我尝试过:
我从这个列表中创建了一个Dataframe:

[['COL-4560', 'COL-9655', 'NWG-0610', 'D81-3754'],
 ['DLL-7760', 'NAT-9885', 'PED-0550', 'MAR-0004', 'LLL-5554']]

使用此代码:

from pyspark.sql import Row
R = Row('col1', 'col2')

# use enumerate to add the ID column

df_from_list = spark.createDataFrame([R(i, x) for i, x in enumerate(recs_list)])

我得到的结果是:

+----+--------------------+
|col1|                col2|
+----+--------------------+
|   0|[COL-4560, COL-96...|
|   1|[DLL-7760, NAT-98...|
+----+--------------------+

我想用逗号将值分隔成列,所以我尝试了:

from pyspark.sql import functions as F

df2 = df_from_list.select('col1', F.split('col2', ', ').alias('col2'))

# If you don't know the number of columns:

df_sizes = df2.select(F.size('col2').alias('col2'))
df_max = df_sizes.agg(F.max('col2'))
nb_columns = df_max.collect()[0][0]

df_result = df2.select('col1', *[df2['col2'][i] for i in range(nb_columns)])
df_result.show()

但我在这条线上有个错误 df2 = df_from_list.select('col1', F.split('col2', ', ').alias('col2')) :

AnalysisException: cannot resolve 'split(`col2`, ', ', -1)' due to data type mismatch: argument 1 requires string type, however, '`col2`' is of array<string> type.;;

我的理想最终输出如下:

+----------+----------+----------+----------+----------+
|  SKU     |  REC_01  | REC_02   | REC_03   | REC_04   |
+----------+----------+----------+----------+----------+
| COL-4560 | COL-9655 | NWG-0610 | D81-3754 | null     |
| DLL-7760 | NAT-9885 | PED-0550 | MAR-0004 | LLL-5554 |
+---------------------+----------+----------+----------+

有些行可能有四个值,但有些行有更多或更少的值,我不知道最终Dataframe将有多少列。
有人知道发生了什么事吗?事先非常感谢。

y53ybaqx

y53ybaqx1#

Dataframe
df_from_list col2 列已存在 array 类型,因此无需拆分(因为拆分与stringtype一起工作,这里我们有arraytype)。
以下是对你有用的步骤。

recs_list=[['COL-4560', 'COL-9655', 'NWG-0610', 'D81-3754'],
 ['DLL-7760', 'NAT-9885', 'PED-0550', 'MAR-0004', 'LLL-5554']]

from pyspark.sql import Row
R = Row('col1', 'col2')

# use enumerate to add the ID column

df_from_list = spark.createDataFrame([R(i, x) for i, x in enumerate(recs_list)])

from pyspark.sql import functions as F

df2 = df_from_list

# If you don't know the number of columns:

df_sizes = df2.select(F.size('col2').alias('col2'))
df_max = df_sizes.agg(F.max('col2'))
nb_columns = df_max.collect()[0][0]

cols=['SKU','REC_01','REC_02','REC_03','REC_04']
df_result = df2.select(*[df2['col2'][i] for i in range(nb_columns)]).toDF(*cols)
df_result.show()

# +--------+--------+--------+--------+--------+

# |     SKU|  REC_01|  REC_02|  REC_03|  REC_04|

# +--------+--------+--------+--------+--------+

# |COL-4560|COL-9655|NWG-0610|D81-3754|    null|

# |DLL-7760|NAT-9885|PED-0550|MAR-0004|LLL-5554|

# +--------+--------+--------+--------+--------+

相关问题