我提出了一个自定义异常来测试我的结构化流式处理作业中的失败,如下所示。我看到查询被终止了,但无法理解为什么驱动程序脚本没有使用非零退出代码失败
streamingDF.writeStream
.trigger(Trigger.ProcessingTime(10000L))
.foreachBatch {
(batchDF: DataFrame, batchId: Long) => {
val transformedDF: DataFrame = DoSomeProcessing(batchDF)
if (batchId == 1) {
throw new Exception("Custom Exception as batchId is 1")
}
我在控制台上得到以下跟踪,但驱动程序脚本没有退出,控制台上也没有打印新的日志。
Exception in thread "main" org.apache.spark.sql.streaming.StreamingQueryException: Custom Exception as batchId is 1
=== Streaming Query ===
Identifier: [id = 6f4c3b4c-bc30-46fe-93ef-8378c23380ab, runId = 1241cb37-493b-4882-ab28-9df8a8c6fb1a]
Current Committed Offsets: ...
Current Available Offsets: ...
Current State: ACTIVE
Thread State: RUNNABLE
Logical Plan:
RepartitionByExpression [timestamp#12], 10
...
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:295)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
Caused by: java.lang.Exception: Custom Exception as batchId is 1
at MySteamingApp$$anonfun$startSparkStructuredStreaming$1.apply(MySteamingApp.scala:61)
at MySteamingApp$$anonfun$startSparkStructuredStreaming$1.apply(MySteamingApp.scala:57)
at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:35)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5$$anonfun$apply$17.apply(MicroBatchExecution.scala:534)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5.apply(MicroBatchExecution.scala:532)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:531)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:198)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
... 1 more
1条答案
按热度按时间vaqhlq811#
我认为任务失败的次数被配置得更多
maxfailures默认为4在放弃作业之前任何特定任务的失败次数。分布在不同任务上的失败总数不会导致作业失败;一个特定的任务必须失败这个次数的尝试。应大于或等于1。允许重试次数=此值-1。
进一步看看有没有办法动态停止spark结构化流媒体?