动态展开结构化流pyspark中的arraytype()列

2guxujil  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(369)

我有以下Dataframe:

root
 |-- sents: array (nullable = false)
 |    |-- element: integer (containsNull = true)
 |-- metadata: array (nullable = true)
 |    |-- element: float (containsNull = true)

+----------+---------------------+
|sents     |metadata             |
+----------+---------------------+
|[1, -1, 0]|[0.4991, 0.5378, 0.0]|
|[-1]      |[0.6281]             |
|[-1]      |[0.463]              |
+----------+---------------------+

我想动态地将每个数组项展开到它自己的列中,以便它可以如下所示:

+--------+--------+--------+-----------+-----------+-----------+
|sents[0]|sents[1]|sents[2]|metadata[0]|metadata[1]|metadata[2]|
+--------+--------+--------+-----------+-----------+-----------+
|       1|      -1|       0|     0.4991|     0.5378|        0.0|
|      -1|    null|    null|     0.6281|       null|       null|
|      -1|    null|    null|      0.463|       null|       null|
+--------+--------+--------+-----------+-----------+-----------+

但在结构化流媒体中,动态执行操作有许多限制:
我尝试了以下方法:

numcol = df.withColumn('phrasesNum', F.size('sents')).agg(F.max('phrasesNum')).head()
df = df.select(*[F.col('sents')[i] for i in range(numcol[0])],*[F.col('metadata')[i] for i in range(numcol[0])])

也:

df_sizes = df.select(F.size('sents').alias('sents'))
df_max = df_sizes.agg(F.max('sents'))
nb_columns = df_max.collect()[0][0]

d = c.select(*[F.map_values(c['metadata'][i]).getItem(0).alias('confidenceIntervals'+"{}".format(j)).cast(DoubleType()) for i,j in enumerate(range(F.size('sents')))],
             *[c['sents'][i].alias('phraseSents'+"{}".format(j)).cast(IntegerType()) for i,j in enumerate(range(nb_columns))])

但是我不能在结构化流中使用.head()、.collect()或.take()之类的东西来创建表示要动态创建的列数的数字变量。有什么想法吗??
谢谢大家

a5g8bdjr

a5g8bdjr1#

只有这样你才能做到 without collecting to driver 节点(首先,采取,收集等),是如果你知道 columns you need 或者 max size of each array column. 这里我假设两列都有 max size of 3 ,需要列 0,1,2. 在流媒体中,Dataframe之间不能有不同的模式(列)。

cols=['0','1','2']

from pyspark.sql import functions as F
df.withColumn("struct1", F.struct(*[F.struct((F.col("sents")[int(x)]).alias('sents[{}]'.format(x))) for x in cols]))\
  .withColumn("struct2", F.struct(*[F.struct((F.col("metadata")[int(x)]).alias('metadata[{}]'.format(x))) for x in cols]))\
  .select(*["struct1.{}.*".format(x) for x in ['col{}'.format((int(x)+1)) for x in cols]],
          *["struct2.{}.*".format(x) for x in ['col{}'.format((int(x)+1)) for x in cols]]).show()

# +--------+--------+--------+-----------+-----------+-----------+

# |sents[0]|sents[1]|sents[2]|metadata[0]|metadata[1]|metadata[2]|

# +--------+--------+--------+-----------+-----------+-----------+

# |       1|      -1|       0|     0.4991|     0.5378|        0.0|

# |      -1|    null|    null|     0.6281|       null|       null|

# |      -1|    null|    null|      0.463|       null|       null|

# +--------+--------+--------+-----------+-----------+-----------+

相关问题