将pyspark dataframe arraytype字段合并到单个arraytype字段中

z31licg0  于 2021-07-09  发布在  Spark
关注(0)|答案(2)|浏览(455)

我有一个pysparkDataframe,有2个arraytype字段:

>>>df
DataFrame[id: string, tokens: array<string>, bigrams: array<string>]
>>>df.take(1)
[Row(id='ID1', tokens=['one', 'two', 'two'], bigrams=['one two', 'two two'])]

我想将它们合并到一个arraytype字段中:

>>>df2
DataFrame[id: string, tokens_bigrams: array<string>]
>>>df2.take(1)
[Row(id='ID1', tokens_bigrams=['one', 'two', 'two', 'one two', 'two two'])]

用于字符串的语法在这里似乎不起作用:

df2 = df.withColumn('tokens_bigrams', df.tokens + df.bigrams)

谢谢!

bxgwgixi

bxgwgixi1#

在spark2.4.0(databricks平台上的2.3)中,您可以使用concat函数在dataframeapi中进行本机操作。在您的示例中,您可以这样做:

from pyspark.sql.functions import col, concat

df.withColumn('tokens_bigrams', concat(col('tokens'), col('bigrams')))

这是相关的吉拉。

bxgwgixi

bxgwgixi2#

Spark>=2.4
你可以用 concat 功能(spark-23736):

from pyspark.sql.functions import col, concat 

df.select(concat(col("tokens"), col("tokens_bigrams"))).show(truncate=False)

# +---------------------------------+

# |concat(tokens, tokens_bigrams)   |

# +---------------------------------+

# |[one, two, two, one two, two two]|

# |null                             |

# +---------------------------------+

当其中一个值 NULL 你可以 coalescearray :

from pyspark.sql.functions import array, coalesce      

df.select(concat(
    coalesce(col("tokens"), array()),
    coalesce(col("tokens_bigrams"), array())
)).show(truncate = False)

# +--------------------------------------------------------------------+

# |concat(coalesce(tokens, array()), coalesce(tokens_bigrams, array()))|

# +--------------------------------------------------------------------+

# |[one, two, two, one two, two two]                                   |

# |[three]                                                             |

# +--------------------------------------------------------------------+

Spark<2.4
不幸的是连接 array 列在一般情况下,您需要一个自定义项,例如:

from itertools import chain
from pyspark.sql.functions import col, udf
from pyspark.sql.types import *

def concat(type):
    def concat_(*args):
        return list(chain.from_iterable((arg if arg else [] for arg in args)))
    return udf(concat_, ArrayType(type))

可用作:

df = spark.createDataFrame(
    [(["one", "two", "two"], ["one two", "two two"]), (["three"], None)], 
    ("tokens", "tokens_bigrams")
)

concat_string_arrays = concat(StringType())
df.select(concat_string_arrays("tokens", "tokens_bigrams")).show(truncate=False)

# +---------------------------------+

# |concat_(tokens, tokens_bigrams)  |

# +---------------------------------+

# |[one, two, two, one two, two two]|

# |[three]                          |

# +---------------------------------+

相关问题