spark流不恢复挂起的批处理(textfilestream)

tquggr8v  于 2021-05-26  发布在  Spark
关注(0)|答案(0)|浏览(261)

我使用spark2.3.1来处理文本文件流。上下文设置如下:

val ssc = new StreamingContext(sparkSession.sparkContext, 
    Seconds(config.sparkStreamingBatchTime))
ssc.checkpoint(config.sparkCheckpointLocation)

一切正常,除非没有足够的资源给工作和批开始堆积。解决办法是结束工作,提供更多资源,然后重新开始。
不幸的是,执行此操作时,已触发但尚未处理的挂起批将丢失。
示例:我每分钟安排一次只执行一个任务的作业 Thread.sleep(3 * 60 * 1000) (三分钟)。批次开始堆积,如预期的那样。
为批处理引入新文件:

20/09/23 08:39:00 DEBUG FileInputDStream: Getting new files for time 1600850340000, ignoring files older than 1600850280000
20/09/23 08:39:00 DEBUG FileInputDStream: hdfs://nameservice1/[...]/test_1 accepted with mod time 1600850308293
20/09/23 08:39:00 INFO org.apache.spark.streaming.dstream.FileInputDStream: Finding new files took 18 ms

当文件集被持久化时,检查点系统显然在工作:

20/09/23 08:39:00 DEBUG FileInputDStream: Updated checkpoint data for time 1600850340000 ms: [
3 file sets
1600850280000 ms ->
1600850340000 ms -> hdfs://nameservice1/[...]/test_1
1600850220000 ms ->
]

然后,我终止了这个作业(显然很优雅,因为调用了shudown钩子):

20/09/23 08:40:00 INFO JobScheduler: Stopped JobScheduler
20/09/23 08:40:00 INFO StreamingContext: StreamingContext stopped successfully
20/09/23 08:40:00 INFO SparkContext: Invoking stop() from shutdown hook
[...]
20/09/23 08:40:00 INFO org.apache.spark.util.ShutdownHookManager: Shutdown hook called

最后,当我再次启动作业时,由于文件的时间戳,该文件被忽略,但它最初没有被处理!

20/09/23 08:44:00 DEBUG FileInputDStream: Getting new files for time 1600850640000, ignoring files older than 1600850580000
20/09/23 08:44:00 DEBUGFileInputDStream: hdfs://nameservice1/[...]/test_1 ignored as mod time 1600850308293 <= ignore time 1600850580000
20/09/23 08:44:00 INFO FileInputDStream: Finding new files took 3 ms

有一种解决方法涉及到“触摸”或复制未处理的文件,但在生产中维护它是一场噩梦。您是否认为我在关机时丢失了某些内容,因此挂起的批处理将持久化到checkpoint文件夹中?是否可以覆盖时间戳检查?其他想法?
谢谢!
更新09/24/2020:我有一个解决办法,但真的很难看。基于这个问题,我设法得到了每批供应的rdd生成文件:

/**
  * Recursive method to extract original metadata files involved in this batch.
  * @param rdd Each RDD created for each batch.
  * @return All HDFS files originally read.
  */
def extractSourceHDFSFiles(rdd: RDD[_]): Set[String] = {
  def extractSourceHDFSFilesWithAcc(rdd: List[RDD[_]]) : Set[String] = {
    rdd match {
      case Nil => Set()
      case head :: tail => {
        val name = head.toString()
        if (name.startsWith("hdfs")){
          Set(name.split(" ")(0)) ++ extractSourceHDFSFilesWithAcc(head.dependencies.map(_.rdd).toList) ++ extractSourceHDFSFilesWithAcc(tail)
        }
        else {
          extractSourceHDFSFilesWithAcc(
head.dependencies.map(_.rdd).toList) 
++ extractSourceHDFSFilesWithAcc(tail)
        }
      }
    }
}

有了它,我至少可以记录哪些文件被处理过。然后,手动搜索未处理的。这很可怕,但这是我唯一的解决办法。
更新#2 09/24/2020:我注意到我可以使用 --conf spark.streaming.fileStream.minRememberDuration 让spark考虑旧的元数据文件,但它赢了´无法阻止它重新处理已处理的文件。我需要使用以前更新的信息去删除这些文件,以及迫使管理员在启动作业之前清理旧的元数据文件。。。
更新#3:09/30/2020我已经挖掘了fileinputdstream的源代码,我对检查点的作用有了更好的了解。它恢复被监视目录的“快照”以及哪些文件在什么时候出现。
然而,这里缺少的一点是,那些生成但从未处理的“文件集”(即批处理),如何在重新启动作业后恢复它们?

20/09/23 08:39:00 DEBUG FileInputDStream: Updated checkpoint data for time 1600850340000 ms: [
3 file sets
1600850280000 ms ->
1600850340000 ms -> hdfs://nameservice1/[...]/test_1
1600850220000 ms ->
]

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题