Pyspark transform function with UDF not working

u0njafvf  于 6个月前  发布在  Spark
关注(0)|答案(1)|浏览(44)

我试图在我的pyspark框架中的结构数组中添加一个新的列'parsed_date'(解析为日期的字符串)。为了做到这一点,我使用dateparser.parse函数,因为我的日期可能具有不可预测的格式。我使用transform()和withField()来处理数组。
我尝试了以下代码:

from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, transform
from pyspark.sql.types import StructType, StructField, StringType, ArrayType, DateType
import dateparser

# Sample dataframe

data = [
    (1, [{"date_field": "january 2023", "detail": "detail1"}, {"date_field": "2011", "detail": "detail2"}]),
    (2, [{"date_field": "2021-07-15", "detail": "detail3"}])
]

schema = StructType([
    StructField("id", StringType(), True),
    StructField("array_of_structs", ArrayType(
        StructType([
            StructField("date_field", StringType(), True),
            StructField("detail", StringType(), True)
        ])
    ), True)
])

df = spark.createDataFrame(data, schema)

# UDF function to parse the dates
@udf
def my_udf(x):
    return dateparser.parse(x)

# Applying the UDF for array of structs
result = df.withColumn("array_of_structs", transform(
    "array_of_structs",
    lambda x: x.withField("parsed_date", my_udf(x["date_field"]))  
))

result.show(truncate=False)

字符串
但我得到以下错误:

org.apache.spark.SparkException: [INTERNAL_ERROR] Cannot evaluate expression: my_udf(lambda x_24#970.date_field)#968


我不知道如何使用transform()函数与UDF一起使用。任何帮助都将不胜感激!

cedebl8k

cedebl8k1#

您不能将F.transform与Python UDF一起使用,您必须为您的UDF提供returnType。您可以调整下面的示例以使用dateparser

from pyspark.sql import types as T
from pyspark.sql import functions as F

data = [
    (1, [{"date_field": "january 2023", "detail": "detail1"}, {"date_field": "2011", "detail": "detail2"}]),
    (2, [{"date_field": "2021-07-15", "detail": "detail3"}])
]

schema = T.StructType([
    T.StructField('id', T.StringType(), True),
    T.StructField('array_of_structs', T.ArrayType(
        T.StructType([
            T.StructField('date_field', T.StringType(), True),
            T.StructField('detail', T.StringType(), True)
        ])
    ), True)
])

df = spark.createDataFrame(data, schema)

return_type = T.ArrayType(
    T.StructType([
        T.StructField('date_field', T.StringType(), True),
        T.StructField('detail', T.StringType(), True),
        T.StructField('parsed_date', T.DateType(), True)
    ]), True
)

@F.udf(returnType=return_type)
def my_udf(array_struct: list) -> list:
    array = []
    for struct in array_struct:
        try:
            parsed_date = datetime.fromisoformat(struct['date_field'])
        except ValueError:
            parsed_date = None

        array.append({
            'date_field': struct['date_field'],
            'detail': struct['detail'],
            'parsed_date': parsed_date
        })
        print(array)

    return array

new_struct = my_udf('array_of_structs').alias('new_array_of_structs')
df.select(new_struct).show(2, False)

# +------------------------------------------------------+
# |new_array_of_structs                                  |
# +------------------------------------------------------+
# |[{january 2023, detail1, null}, {2011, detail2, null}]|
# |[{2021-07-15, detail3, 2021-07-15}]                   |
# +------------------------------------------------------+

字符串

相关问题