在从spark1.6迁移到spark2.2的过程中,我遇到了这个问题。
spark 1.6的实现有两个自定义项:
sparksql udf(通过sqlcontext.udf().register(…)注册的org.apache.spark.sql.api.java.udf2的实现)
为配置单元设计的java自定义udf(通过hivecontext.sql(…)注册的org.apache.hadoop.hive.ql.udf.generic.genericudtf的实现)
两个udf都是通过一个同名foo的上下文注册的
使用spark 1.6的java示例:
public static void register(SQLContext sqlContext) {
sqlContext.udf().register("foo", new Foo(), DataTypes.StringType);
if (sqlContext instanceof HiveContext) {
HiveContext hiveContext = (HiveContext) sqlContext;
hiveContext.sql(
"create temporary function foo as 'com.FooHive'");
}
}
我试着在sparksession(spark 2.2)上也这么做:
public static void register(SparkSession sparkSession) {
sparkSession.udf().register("foo", new Foo(), DataTypes.StringType);
sparkSession.sql(
"create temporary function foo as 'com.FooHive'");
}
此代码导致异常:
org.apache.spark.sql.AnalysisException: Function foo already exists;
有没有办法解决sparksession这个问题?
1条答案
按热度按时间q5iwbnjs1#
在spark2.3中,对于临时函数,这个问题似乎已经解决了。如果您在分支中看到2.2正在使用
registerFunction
https://github.com/apache/spark/blob/branch-2.2/sql/core/src/main/scala/org/apache/spark/sql/udfregistration.scala但在2.3中,他们把它改成了
createOrReplaceTempFunction
https://github.com/apache/spark/blob/branch-2.3/sql/core/src/main/scala/org/apache/spark/sql/udfregistration.scala这只是看到代码,我希望它有帮助。