我正在做一个超过100个RDD的联合,累计需要几GB。最后,我想做一个 count
,并计算我在所有这些文件中有多少个唯一的字符串。但是,齐柏林飞艇出现了一个内存错误:
org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 8263 tasks (1024.1 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2041)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2029)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2028)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2028)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:966)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:966)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:966)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2262)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2211)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2200)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:777)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
at org.apache.spark.rdd.RDD.count(RDD.scala:1168)
... 55 elided
现在,我知道 count
是一个操作,但不应该优化为不在本地提取所有数据,并以分布式方式计算计数吗?我错过了什么?
注意:我没有使用任何 collect
-s、 仅限 select
, distinct
, map
以及 union
当涉及到转换时 count
从行动中。
更新:
所以我花了一些时间调查和谷歌搜索如何解决这个问题。基本上,我发现了两个帮助我的改进:
我发现使用RDD代替数据集是非常不理想的,如果没有超具体的原因为什么有人会转换。这只是一个变化,几乎只是删除了 .rdd
呼叫,解决了记忆问题。然而,由于我犯了太多的错误,我又犯了一个有点奇怪的错误 union
s。
调查这一点,我发现我可以加载一堆文件一次与regex在一起 load
方法,从而避免这样做 union
所有这些数据集。所以从10行代码中,我得到了一个 load(pathRegex).map(...).distinct.count()
.
暂无答案!
目前还没有任何答案,快来回答吧!