我正在使用spark 2.3.2。
在Dataframe中的一列上,我依次执行许多spark.sql.functions。如何将此函数序列 Package 到用户定义函数(udf)中以使其可重用?
下面是我的示例,重点是一列“columnname”。首先,我创建我的测试数据:
val testSchema = new StructType()
.add("columnName", new StructType()
.add("2020-11", LongType)
.add("2020-12", LongType)
)
val testRow = Seq(Row(Row(1L, 2L)))
val testRDD = spark.sparkContext.parallelize(testRow)
val testDF = spark.createDataFrame(testRDD, testSchema)
testDF.printSchema()
/*
root
|-- columnName: struct (nullable = true)
| |-- 2020-11: long (nullable = true)
| |-- 2020-12: long (nullable = true)
* /
testDF.show(false)
/*
+----------+
|columnName|
+----------+
|[1, 2] |
+----------+
* /
下面是应用的spark sql函数序列(仅作为示例):
val testResult = testDF
.select(explode(split(regexp_replace(to_json(col("columnName")), "[\"{}]", ""), ",")).as("result"))
我无法创建自定义项“myudf”,以便在调用时得到相同的结果
val testResultWithUDF = testDF.select(myUDF(col("columnName"))
这就是我“想”做的:
def parseAndExplode(spalte: Column): Column = {
explode(split(regexp_replace(to_json(spalte), "[\"{}]", ""), ",")
}
val myUDF = udf(parseAndExplode _)
testDF.withColumn("udf_result", myUDF(col("columnName"))).show(false)
但它抛出了一个例外:
Schema for type org.apache.spark.sql.Column is not supported
java.lang.UnsupportedOperationException: Schema for type org.apache.spark.sql.Column is not supported
还尝试使用 Row
作为输入参数,但再次尝试应用内置sql函数失败。
1条答案
按热度按时间k10s72fa1#
这里不需要使用自定义项。
explode
,split
而且org.apache.spark.sql.functions中的大多数其他函数已经返回了column类型的对象。印刷品