scala—为什么spark结构化流式处理作业即使在引发异常后也没有终止

hiz5n14c  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(305)

我提出了一个自定义异常来测试我的结构化流式处理作业中的失败,如下所示。我看到查询被终止了,但无法理解为什么驱动程序脚本没有使用非零退出代码失败

streamingDF.writeStream
        .trigger(Trigger.ProcessingTime(10000L))
        .foreachBatch {
          (batchDF: DataFrame, batchId: Long) => {
            val transformedDF: DataFrame = DoSomeProcessing(batchDF)
            if (batchId == 1) {
              throw new Exception("Custom Exception as batchId is 1")
            }

我在控制台上得到以下跟踪,但驱动程序脚本没有退出,控制台上也没有打印新的日志。

Exception in thread "main" org.apache.spark.sql.streaming.StreamingQueryException: Custom Exception as batchId is 1
=== Streaming Query ===
Identifier: [id = 6f4c3b4c-bc30-46fe-93ef-8378c23380ab, runId = 1241cb37-493b-4882-ab28-9df8a8c6fb1a]
Current Committed Offsets: ...
Current Available Offsets: ...

Current State: ACTIVE
Thread State: RUNNABLE

Logical Plan:
RepartitionByExpression [timestamp#12], 10
...
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:295)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
Caused by: java.lang.Exception: Custom Exception as batchId is 1
    at MySteamingApp$$anonfun$startSparkStructuredStreaming$1.apply(MySteamingApp.scala:61)
    at MySteamingApp$$anonfun$startSparkStructuredStreaming$1.apply(MySteamingApp.scala:57)
    at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:35)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5$$anonfun$apply$17.apply(MicroBatchExecution.scala:534)
    at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5.apply(MicroBatchExecution.scala:532)
    at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:531)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:198)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
    at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166)
    at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
    ... 1 more
vaqhlq81

vaqhlq811#

我认为任务失败的次数被配置得更多
maxfailures默认为4在放弃作业之前任何特定任务的失败次数。分布在不同任务上的失败总数不会导致作业失败;一个特定的任务必须失败这个次数的尝试。应大于或等于1。允许重试次数=此值-1。
进一步看看有没有办法动态停止spark结构化流媒体?

相关问题