sparksessionextensions函数

tcomlyy6  于 2021-07-14  发布在  Spark
关注(0)|答案(1)|浏览(401)

sparksessionextensions函数在本地工作,但我无法在databricks环境中工作。
itachi项目定义了catalyst表达式,如 age 我可以通过 spark-sql :

bin/spark-sql --packages com.github.yaooqinn:itachi_2.12:0.1.0 --conf spark.sql.extensions=org.apache.spark.sql.extra.PostgreSQLExtensions
spark-sql> select age(timestamp '2000', timestamp'1990');
10 years

我在databricks环境下工作有困难。
我和 spark.sql.extensions=org.apache.spark.sql.extra.PostgreSQLExtensions 配置选项集。

然后我把图书馆连了起来。

这个 array_append itachi中定义的函数无法像我预期的那样访问:

确认配置选项设置正确:

spark alchemy有另一种在databricks环境中工作的方法。我们需要在databricks环境中使用spark内部构件吗?还是有办法 injectFunction 在数据库里工作?

wj8zmpe1

wj8zmpe11#

这个 spark.sql.extensions 在完整的databricks上工作得很好(直到它深入到spark的内部——有时存在不兼容),但在communityedition上就不行了。问题是 spark.sql.extensions 在会话初始化过程中调用,然后安装ui中指定的库,所以这在初始化之后/与初始化并行发生。在完整的databricks上,在集群启动之前使用init脚本安装库是一种变通方法,但是这个功能在communityedition上不可用。
解决方法是显式注册函数,如下所示:

%scala
import org.apache.spark.sql.catalyst.expressions.postgresql.{Age, ArrayAppend, ArrayLength, IntervalJustifyLike, Scale, SplitPart, StringToArray, UnNest}
import org.apache.spark.sql.extra.FunctionAliases

spark.sessionState.functionRegistry.registerFunction(Age.fd._1, Age.fd._2, Age.fd._3)
spark.sessionState.functionRegistry.registerFunction(FunctionAliases.array_cat._1, FunctionAliases.array_cat._2, FunctionAliases.array_cat._3)
spark.sessionState.functionRegistry.registerFunction(ArrayAppend.fd._1, ArrayAppend.fd._2, ArrayAppend.fd._3)
spark.sessionState.functionRegistry.registerFunction(ArrayLength.fd._1, ArrayLength.fd._2, ArrayLength.fd._3)
spark.sessionState.functionRegistry.registerFunction(IntervalJustifyLike.justifyDays._1, IntervalJustifyLike.justifyDays._2, IntervalJustifyLike.justifyDays._3)
spark.sessionState.functionRegistry.registerFunction(IntervalJustifyLike.justifyHours._1, IntervalJustifyLike.justifyHours._2, IntervalJustifyLike.justifyHours._3)
spark.sessionState.functionRegistry.registerFunction(IntervalJustifyLike.justifyInterval._1, IntervalJustifyLike.justifyInterval._2, IntervalJustifyLike.justifyInterval._3)
spark.sessionState.functionRegistry.registerFunction(Scale.fd._1, Scale.fd._2, Scale.fd._3)
spark.sessionState.functionRegistry.registerFunction(SplitPart.fd._1, SplitPart.fd._2, SplitPart.fd._3)
spark.sessionState.functionRegistry.registerFunction(StringToArray.fd._1, StringToArray.fd._2, StringToArray.fd._3)
spark.sessionState.functionRegistry.registerFunction(UnNest.fd._1, UnNest.fd._2, UnNest.fd._3)

在那之后它工作了:

它不像扩展那么方便,但这是ce的一个限制。

相关问题