使用嵌入式derby元存储时未清理spark后台线程

2fjabf4q  于 2021-05-26  发布在  Spark
关注(0)|答案(0)|浏览(148)

在我开始提问之前,先介绍一下背景知识:
我参与了一个使用自举spark应用程序的项目,因此,我们有很多专门用于sparksession初始化的逻辑。因此,对于单元测试,我们启动/关闭许多会话。
我们的ci构建最近确实特别慢/有缺陷,经过一些调查,我们发现由于失控的守护进程线程创建/持久性,我们正在达到资源限制。下面是一个简短的代码片段,再现了这个问题:

import java.io.File
import java.nio.file.Files
import org.apache.commons.io.FileUtils
import org.apache.spark.sql.SparkSession
import org.scalatest.FunSuite

class TestSessionCleanupTest extends FunSuite {

  private def createAndStopSparkSession(): Unit = {
    setUpDerby()
    val session = SparkSession.builder()
      .appName("session_cleanup_test")
      .master("local")
      .enableHiveSupport()
      .getOrCreate()
    session.sql("show databases")
    session.stop()
  }

  private def setUpDerby(): Unit = {
    val tempDir = Files.createTempDirectory("derby")
    tempDir.toFile.deleteOnExit() // deleting eagerly after session.stop() makes no difference
    System.setProperty("derby.system.home", tempDir.toString)
  }

  test("Start/stop a bunch of sessions") {
    Range(1,50).foreach { idx =>
      println(s"Starting session $idx")
      createAndStopSparkSession()
      println(s"Stopped session $idx")
    }
  }
}

使用JavaVisualVM,您可以看到线程计数和堆大小随着时间的推移而稳步增加:线程和堆状态
似乎没有得到清理/增长的特定守护程序线程包括:
BoneCP-keep-alive-scheduler BoneCP-pool-watch-thread com.google.common.base.internal.Finalizer derby.rawStoreDaemon (少了一些,但还是一堆)
当然,如果我把 spark.sql("show databases") 语句,则不会发生此行为。因此,这显然是jdbc连接池到嵌入式derby元存储的问题。
我在这个例子中使用了spark2.4.4,但是fwiw我们在spark3上也看到了这种行为。

name := "Test Session Clean Up"

scalaVersion := "2.11.12"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % "2.4.4",
  "org.apache.spark" %% "spark-sql" % "2.4.4",
  "org.apache.spark" %% "spark-hive" % "2.4.4",
  "org.scalatest" %% "scalatest" % "3.1.0" % Test
)

最后,我的问题是:
你知道为什么会这样吗?
有什么好办法吗?
有没有可能的创可贴/解决方法(而不是在所有单元测试之间共享单个会话/上下文)
如果可能的话,我们仍然希望使用嵌入式元存储,但是因为这只用于单元测试——而不是生产——我们可以接受一个黑客/不雅的解决方案(例如,我已经尝试过手动中断和/或终止相关线程,但没有运气)。
谢谢!

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题