我正在将代码从Apache Spark 2.4迁移到Spark 3.3(在Azure Synapse中)。
运行相同的代码,在2.4中可以工作,但在3.3中。**我在一个更大的Apache Spark池(集群)中运行Spark 3.3。**我运行的代码是下一个(还有更多的withColumns,但只是总结):
def windowTrans= Window.partitionBy("COD_X", "COD_Y", "date").
orderBy($"COD_X",$"COD_Y",$"date")
def windowTransMinMax= Window.partitionBy("COD_X", "COD_Y", "date")
def dataframeAvg(spark: SparkSession, df: DataFrame): DataFrame = {
import spark.implicits._
val dfAvg = df.
withColumn("row", row_number.over(windowTrans)).
withColumn("LONG_HIST", count("row") over windowTrans).
withColumn("AVG_COL1", avg("col1") over windowTrans).
withColumn("AVG2", avg("col2") over windowTrans).
withColumn("AVG3", avg("col3") over windowTrans).
withColumn("AVG6", avg("col6") over windowTrans).
...
withColumn("STD_COL1", stddev_samp("col1") over windowTrans).
withColumn("STD2", stddev_samp("col2") over windowTrans).
withColumn("STD3", stddev_samp("col3") over windowTrans).
withColumn("STD6", stddev_samp("col6") over windowTrans).
...
withColumn("MIN_COL1", min("col1") over windowTransMinMax).
withColumn("MIN2", min("col2") over windowTransMinMax).
withColumn("MIN3", min("col3") over windowTransMinMax).
withColumn("MIN6", min("col6") over windowTransMinMax).
...
where(col("row") === 1).
select(/* List of columns */)
dfAvg
}
错误是下一个:
java.lang.StackOverflowError at org.apache.spark.sql.catalyst.trees.TreeNode$$Lambda$5466/589672638.get$Lambda(Unknown Source)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:777)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:427)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:745)
at org.apache.spark.sql.catalyst.trees.TreeNode.clone(TreeNode.scala:868)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$clone(LogicalPlan.scala:31)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.clone(AnalysisHelper.scala:295)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.clone$(AnalysisHelper.scala:294)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.clone(LogicalPlan.scala:31)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.clone(LogicalPlan.scala:31)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$clone$1(TreeNode.scala:868)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:747)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:427)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:745)
at org.apache.spark.sql.catalyst.trees.TreeNode.clone(TreeNode.scala:868)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$clone(LogicalPlan.scala:31)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.clone(AnalysisHelper.scala:295)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.clone$(AnalysisHelper.scala:294)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.clone(LogicalPlan.scala:31)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.clone(LogicalPlan.scala:31)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$clone$1(TreeNode.scala:868)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:747)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:427)
我正在做研究,但我无法找到原因以及如何解决这个问题。
2条答案
按热度按时间2w3rbyxf1#
看起来你的查询计划真的很长(包含所有那些
withColumn
转换)。你的堆栈因此而溢出。如果您查看错误堆栈跟踪,您会发现它一遍又一遍地执行相同的操作。关于什么是堆栈溢出错误的更多信息可以在this SO question中找到。如果您真的必须执行这里尝试执行的所有转换,则可以增加堆栈大小。典型的默认值是1024KB,因此可以通过将
spark.driver.extraJavaOptions
设置为-Xss4M
来将其增加到4M
。如果你使用spark-submit
来提交你的应用程序,你可以这样做:slwdgvem2#
虽然我同意Koedit的回答,为什么你应该更喜欢重写那些简单的选择。按照this answer的evidence,它可能非常昂贵。
它在2.4已经很贵了,Spark足够快,不会打扰你太多。3.3只是造成国有企业除了执行缓慢.摆脱国有企业并不能阻止业绩的下降。