如何在spark中将多个sql函数 Package 成一个udf?

7vux5j2d  于 2021-05-24  发布在  Spark
关注(0)|答案(1)|浏览(297)

我正在使用spark 2.3.2。
在Dataframe中的一列上,我依次执行许多spark.sql.functions。如何将此函数序列 Package 到用户定义函数(udf)中以使其可重用?
下面是我的示例,重点是一列“columnname”。首先,我创建我的测试数据:

val testSchema = new StructType()
  .add("columnName", new StructType()
    .add("2020-11", LongType)
    .add("2020-12", LongType)
  )

val testRow = Seq(Row(Row(1L, 2L)))
val testRDD = spark.sparkContext.parallelize(testRow)
val testDF = spark.createDataFrame(testRDD, testSchema)
testDF.printSchema()

/*
root
 |-- columnName: struct (nullable = true)
 |    |-- 2020-11: long (nullable = true)
 |    |-- 2020-12: long (nullable = true)

* /

testDF.show(false)

/*
+----------+
|columnName|
+----------+
|[1, 2]    |
+----------+

* /

下面是应用的spark sql函数序列(仅作为示例):

val testResult = testDF
  .select(explode(split(regexp_replace(to_json(col("columnName")), "[\"{}]", ""), ",")).as("result"))

我无法创建自定义项“myudf”,以便在调用时得到相同的结果

val testResultWithUDF = testDF.select(myUDF(col("columnName"))

这就是我“想”做的:

def parseAndExplode(spalte: Column): Column = {
  explode(split(regexp_replace(to_json(spalte), "[\"{}]", ""), ",")
}
val myUDF = udf(parseAndExplode _)

testDF.withColumn("udf_result", myUDF(col("columnName"))).show(false)

但它抛出了一个例外:

Schema for type org.apache.spark.sql.Column is not supported
java.lang.UnsupportedOperationException: Schema for type org.apache.spark.sql.Column is not supported

还尝试使用 Row 作为输入参数,但再次尝试应用内置sql函数失败。

k10s72fa

k10s72fa1#

这里不需要使用自定义项。 explode , split 而且org.apache.spark.sql.functions中的大多数其他函数已经返回了column类型的对象。

def parseAndExplode(spalte: Column): Column = {
  explode(split(regexp_replace(to_json(spalte), "[\"{}]", ""), ","))
}

testDF.withColumn("udf_result",parseAndExplode('columnName)).show(false)

印刷品

+----------+----------+
|columnName|udf_result|
+----------+----------+
|[1, 2]    |2020-11:1 |
|[1, 2]    |2020-12:2 |
+----------+----------+

相关问题