迁移到Spark 3.3(Scala)后,在Window函数中出现StackOverflowError

6tdlim6h  于 8个月前  发布在  Scala
关注(0)|答案(2)|浏览(83)

我正在将代码从Apache Spark 2.4迁移到Spark 3.3(在Azure Synapse中)。
运行相同的代码,在2.4中可以工作,但在3.3中。**我在一个更大的Apache Spark池(集群)中运行Spark 3.3。**我运行的代码是下一个(还有更多的withColumns,但只是总结):

def windowTrans= Window.partitionBy("COD_X", "COD_Y", "date").
    orderBy($"COD_X",$"COD_Y",$"date")

def windowTransMinMax= Window.partitionBy("COD_X", "COD_Y", "date")
  
  
def dataframeAvg(spark: SparkSession, df: DataFrame): DataFrame = {
    import spark.implicits._

    val dfAvg = df.
    withColumn("row", row_number.over(windowTrans)).
    withColumn("LONG_HIST", count("row") over windowTrans).

    withColumn("AVG_COL1", avg("col1") over windowTrans).
    withColumn("AVG2", avg("col2") over windowTrans).
    withColumn("AVG3", avg("col3") over windowTrans).
    withColumn("AVG6", avg("col6") over windowTrans).

    ...

    withColumn("STD_COL1", stddev_samp("col1") over windowTrans).
    withColumn("STD2", stddev_samp("col2") over windowTrans).
    withColumn("STD3", stddev_samp("col3") over windowTrans).
    withColumn("STD6", stddev_samp("col6") over windowTrans).
    
    ...
    
    withColumn("MIN_COL1", min("col1") over windowTransMinMax).
    withColumn("MIN2", min("col2") over windowTransMinMax).
    withColumn("MIN3", min("col3") over windowTransMinMax).
    withColumn("MIN6", min("col6") over windowTransMinMax).
    
    ...
    where(col("row") === 1).
    select(/* List of columns */)
    
    dfAvg
}

错误是下一个:

java.lang.StackOverflowError at org.apache.spark.sql.catalyst.trees.TreeNode$$Lambda$5466/589672638.get$Lambda(Unknown Source)
      at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:777)
      at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:427)
      at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:745)
      at org.apache.spark.sql.catalyst.trees.TreeNode.clone(TreeNode.scala:868)
      at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$clone(LogicalPlan.scala:31)
      at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.clone(AnalysisHelper.scala:295)
      at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.clone$(AnalysisHelper.scala:294)
      at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.clone(LogicalPlan.scala:31)
      at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.clone(LogicalPlan.scala:31)
      at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$clone$1(TreeNode.scala:868)
      at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:747)
      at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:427)
      at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:745)
      at org.apache.spark.sql.catalyst.trees.TreeNode.clone(TreeNode.scala:868)
      at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$clone(LogicalPlan.scala:31)
      at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.clone(AnalysisHelper.scala:295)
      at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.clone$(AnalysisHelper.scala:294)
      at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.clone(LogicalPlan.scala:31)
      at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.clone(LogicalPlan.scala:31)
      at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$clone$1(TreeNode.scala:868)
      at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:747)
      at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:427)

我正在做研究,但我无法找到原因以及如何解决这个问题。

2w3rbyxf

2w3rbyxf1#

看起来你的查询计划真的很长(包含所有那些withColumn转换)。你的堆栈因此而溢出。如果您查看错误堆栈跟踪,您会发现它一遍又一遍地执行相同的操作。关于什么是堆栈溢出错误的更多信息可以在this SO question中找到。
如果您真的必须执行这里尝试执行的所有转换,则可以增加堆栈大小。典型的默认值是1024KB,因此可以通过将spark.driver.extraJavaOptions设置为-Xss4M来将其增加到4M。如果你使用spark-submit来提交你的应用程序,你可以这样做:

spark-submit \ 
  --master ... \  
  --conf "spark.driver.extraJavaOptions=-Xss4M" \ 
  ...
slwdgvem

slwdgvem2#

虽然我同意Koedit的回答,为什么你应该更喜欢重写那些简单的选择。按照this answerevidence,它可能非常昂贵。
它在2.4已经很贵了,Spark足够快,不会打扰你太多。3.3只是造成国有企业除了执行缓慢.摆脱国有企业并不能阻止业绩的下降。

相关问题