增量合并中出现spark非描述性错误

ny6fqffe  于 2021-07-14  发布在  Spark
关注(0)|答案(1)|浏览(375)

我正在使用databricks(databricks runtime 8)中的spark3.1和一个非常大的集群(25个worker,拥有112gb内存,每个内核16个)来复制azure数据湖存储(adls gen2)中的几个sap表。为此,一个工具将所有这些表的delta写入一个中间系统(sqlserver),然后,如果我有某个表的新数据,我将执行databricks作业,将新数据与adls中可用的现有数据合并。
这个过程对于大多数表来说都很好,但是其中一些表(最大的表)需要花费大量时间进行合并(我使用每个表的pk来合并数据),而最大的一个表从一周前(当生成表的一个大增量时)就开始失败了。我在作业中看到的错误跟踪:
py4jjavaerror:调用o233.sql时出错:org.apache.spark.sparkexception:作业已中止。位于org.apache.spark.sql.execution.datasources.fileformatwriter$.write(fileformatwriter。scala:234)在com.databricks.sql.transaction.tahoe.files.transactionalwritedge.$anonfun$writefiles$5(transactionalwritedge)。scala:246) ... .. ............................................................................................................................................................................................................................................................................................................................................................................ 引起原因:org.apache.spark.sparkexception:在awaitresult中引发异常:位于org.apache.spark.util.threadutils$.awaitresult(threadutils)。scala:428)在com.databricks.sql.transaction.tahoe.perf.deltaoptimizedwriterexec.awaitshufflemapstage$1(deltaoptimizedwriterexec。scala:153)在com.databricks.sql.transaction.tahoe.perf.deltaoptimizedwriterexec.getshufflestats(deltaoptimizedwriterexec。scala:158)在com.databricks.sql.transaction.tahoe.perf.deltaoptimizedwriterexec.computebins(deltaoptimizedwriterexec。scala:106)在com.databricks.sql.transaction.tahoe.perf.deltaoptimizedwriterexec.doexecute(deltaoptimizedwriterexec。scala:174)在org.apache.spark.sql.execution.sparkplan.$anonfun$execute$1(sparkplan。scala:196)在org.apache.spark.sql.execution.sparkplan.$anonfun$executequery$1(sparkplan。scala:240)在org.apache.spark.rdd.rddoperationscope$.withscope(rddoperationscope。scala:165)在org.apache.spark.sql.execution.sparkplan.executequery(sparkplan。scala:236)在org.apache.spark.sql.execution.sparkplan.execute(sparkplan。scala:192)位于org.apache.spark.sql.execution.datasources.fileformatwriter$.write(fileformatwriter。scala:180) ... 141更多原因:org.apache.spark.sparkexception:由于阶段失败而中止作业:shufflemapstage 68(在deltaoptimizedwriterexec执行)。scala:97)已超过允许的最大失败次数:4。最近的失败原因:org.apache.spark.shuffle.fetchfailedexception:connection from/.xx.xx.xx:4048 closed at org.apache.spark.storage.shuffleblockfetcheriterator.throwfetchfailedexception(shuffleblockfetcheriterator)。scala:769)在org.apache.spark.storage.shuffleblockfetcheriterator.next(shuffleblockfetcheriterator。scala:684)在org.apache.spark.storage.shuffleblockfetcheriterator.next(shuffleblockfetcheriterator。scala:69)在.................................................................................................................................................................................................................................................................................................................................... ... java.lang.thread.run(线程。java:748)原因:java.io.ioexception:connection from/.xx.xx.xx:4048 closed at org.apache.spark.network.client.transportresponsehandler.channelinactive(transportresponsehandler)。java:146)位于org.apache.spark.network.server.transportchannelhandler.channelinactive(transportchannelhandler)。java:117)在io.netty.channel.abstractchannelhandlercontext.InvokeChannelActive(abstractchannelhandlercontext。java:262)在io.netty.channel.abstractchannelhandlercontext.InvokeChannelActive(abstractchannelhandlercontext。java:248)在io.netty.channel.abstractchannelhandlercontext.firechannelinactive(abstractchannelhandlercontext。java:241)在io.netty.channel.channelinboundhandleradapter.channelinactive(channelinboundhandleradapter。java:81)在io.netty.handler.timeout.idlestatehandler.channelinactive(idlestatehandler。java:277)在io.netty.channel.abstractchannelhandlercontext.InvokeChannelActive(abstractchannelhandlercontext。java:262)在io.netty.channel.abstractchannelhandlercontext.InvokeChannelActive(abstractchannelhandlercontext。java:248)在io.netty.channel.abstractchannelhandlercontext.firechannelinactive(abstractchannelhandlercontext。java:241)在io.netty.channel.channelinboundhandleradapter.channelinactive(channelinboundhandleradapter。java:81)在org.apache.spark.network.util.transportframedecoder.channelinactive(transportframedecoder)。java:225)在io.netty.channel.abstractchannelhandlercontext.InvokeChannelActive(abstractchannelhandlercontext。java:262)在io.netty.channel.abstractchannelhandlercontext.InvokeChannelActive(abstractchannelhandlercontext。java:248)在io.netty.channel.abstractchannelhandlercontext.firechannelinactive(abstractchannelhandlercontext。java:241)在io.netty.channel.defaultchannelpipeline$headcontext.channelinactive(defaultchannelpipeline。java:1405)在io.netty.channel.abstractchannelhandlercontext.InvokeChannelActive(abstractchannelhandlercontext。java:262)在io.netty.channel.abstractchannelhandlercontext.InvokeChannelActive(abstractchannelhandlercontext。java:248)在io.netty.channel.defaultchannelpipeline.firechannelinactive(defaultchannelpipeline。java:901)在io.netty.channel.abstractchannel$abstractunsafe$8.run(abstractchannel。java:818)在io.netty.util.concurrent.abstracteventexecutor.safeexecute(abstracteventexecutor。java:164)在io.netty.util.concurrent.singlethreadeventexecutor.runalltasks(singlethreadeventexecutor。java:472)在io.netty.channel.nio.nioeventloop.run(nioeventloop。java:497)在io.netty.util.concurrent.singlethreadeventexecutor$4.run(singlethreadeventexecutor。java:989)在io.netty.util.internal.threadexecutormap$2.run(threadexecutormap。java:74)在io.netty.util.concurrent.fastthreadlocalrunnable.run(fastthreadlocalrunnable。java:30) ... 还有1个
由于错误是非描述性的,我查看了每个执行器日志,并看到以下消息:
2007年4月21日09:11:24错误oneforoneblockfetcher:启动块获取java.io时失败。ioexception:来自/.xx.xx.xx的连接:4048已关闭
在似乎无法连接的executor中,我看到以下错误消息:
21/04/06 09:30:46错误sparkthreadlocapturingrunnable:线程任务reaper-7 org.apache.spark.sparkexception中出现异常:正在终止executor jvm,因为终止的任务5912无法在60000毫秒内停止。位于org.apache.spark.executor.executor$taskreaper.run(executor)。scala:1119)在org.apache.spark.util.threads.sparkthreadlocapturingrunnable.$anonfun$run$1(sparkthreadlocalforwardingthreadpoolexecutor。scala:104)在scala.runtime.java8.jfunction0$mcv$sp.apply(jfunction0$mcv$sp。java:23)位于org.apache.spark.util.threads.sparkthreadlocapturinghelper.runwithcaptured(sparkthreadlocalforwardingthreadpoolexecutor)。scala:68)在org.apache.spark.util.threads.sparkthreadlocapturinghelper.runwithcaptured$(sparkthreadlocalforwardingthreadpoolexecutor。scala:54)位于org.apache.spark.util.threads.sparkthreadlocapturingrunnable.runwithcaptured(sparkthreadlocalforwardingthreadpoolexecutor)。scala:101)在org.apache.spark.util.threads.sparkthreadlocapturingrunnable.run(sparkthreadlocalforwardingthreadpoolexecutor。scala:104)位于java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor。java:1149)在java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor。java:624)在java.lang.thread.run(
我尝试过增加默认的shuffle并行性(从这里建议的200增加到1200,spark-application-kills-executor),看起来这个作业的执行时间更长,但它再次失败。
我试着在执行任务的同时监视斯巴库人:

但正如您所看到的,问题是相同的:有些阶段失败是因为执行器无法访问,因为任务失败了x次以上。
我上面提到的大三角洲大约有40-50亿行,我想合并的大垃圾场大约有1亿行。这个表还没有分区,所以这个过程需要大量的工作。失败的是合并部分,而不是将数据从sqlserver复制到adls的过程,因此,一旦要合并的数据已经是parquet格式,就要进行合并。
你知道发生了什么,或者我能做些什么来完成这个合并吗?
提前谢谢。

fhity93d

fhity93d1#

最后,我检查了集群,并将spark.sql.shuffle.partitions属性更改为1600,这是我希望使用此配置执行的作业的代码(而不是直接在集群上更改此属性)。在我的集群中,我有400个核心,所以我选择了这个数字的倍数(1600)。
在那之后,两个小时内执行完毕。我得出这个结论是因为,在我的日志和spark ui中,我观察到大量磁盘溢出,所以我认为分区不适合工作节点。

相关问题