我试图在我的pyspark框架中的结构数组中添加一个新的列'parsed_date'(解析为日期的字符串)。为了做到这一点,我使用dateparser.parse函数,因为我的日期可能具有不可预测的格式。我使用transform()和withField()来处理数组。
我尝试了以下代码:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, transform
from pyspark.sql.types import StructType, StructField, StringType, ArrayType, DateType
import dateparser
# Sample dataframe
data = [
(1, [{"date_field": "january 2023", "detail": "detail1"}, {"date_field": "2011", "detail": "detail2"}]),
(2, [{"date_field": "2021-07-15", "detail": "detail3"}])
]
schema = StructType([
StructField("id", StringType(), True),
StructField("array_of_structs", ArrayType(
StructType([
StructField("date_field", StringType(), True),
StructField("detail", StringType(), True)
])
), True)
])
df = spark.createDataFrame(data, schema)
# UDF function to parse the dates
@udf
def my_udf(x):
return dateparser.parse(x)
# Applying the UDF for array of structs
result = df.withColumn("array_of_structs", transform(
"array_of_structs",
lambda x: x.withField("parsed_date", my_udf(x["date_field"]))
))
result.show(truncate=False)
字符串
但我得到以下错误:
org.apache.spark.SparkException: [INTERNAL_ERROR] Cannot evaluate expression: my_udf(lambda x_24#970.date_field)#968
型
我不知道如何使用transform()函数与UDF一起使用。任何帮助都将不胜感激!
1条答案
按热度按时间cedebl8k1#
您不能将
F.transform
与Python UDF一起使用,您必须为您的UDF提供returnType
。您可以调整下面的示例以使用dateparser
。字符串