如果从多个mapr位置读取数据,spark shuffle是如何工作的?

8aqjt8rx  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(382)

我在mapr集群中有两个位置,我的spark工作是从这两个端点加载数据。其中一个端点拥有大量数据,而另一个端点则相对较少。现在,当我像一个 .reduceByKey 或者 .groupByKey 出现异常:

"java.lang.OutOfMemoryError: Java heap space
    at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
    at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
    at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$3.apply(TorrentBroadcast.scala:286)
    at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$3.apply(TorrentBroadcast.scala:286)
    at org.apache.spark.util.io.ChunkedByteBufferOutputStream.allocateNewChunkIfNeeded(ChunkedByteBufferOutputStream.scala:87)
    at org.apache.spark.util.io.ChunkedByteBufferOutputStream.write(ChunkedByteBufferOutputStream.scala:75)
    at net.jpountz.lz4.LZ4BlockOutputStream.flushBufferedData(LZ4BlockOutputStream.java:220)
    at net.jpountz.lz4.LZ4BlockOutputStream.write(LZ4BlockOutputStream.java:173)
    at com.esotericsoftware.kryo.io.Output.flush(Output.java:185)
    at com.esotericsoftware.kryo.io.Output.close(Output.java:196)
    at org.apache.spark.serializer.KryoSerializationStream.close(KryoSerializer.scala:255)
    at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$blockifyObject$1.apply$mcV$sp(TorrentBroadcast.scala:293)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1368)
    at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:292)
    at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:127)
    at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:88)
    at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
    at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)
    at org.apache.spark.MapOutputTracker$.serializeMapStatuses(MapOutputTracker.scala:810)
    at org.apache.spark.ShuffleStatus.serializedMapStatus(MapOutputTracker.scala:174)
    at org.apache.spark.MapOutputTrackerMaster$MessageLoop.run(MapOutputTracker.scala:397)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
"

现在,如果我将数据从一个位置复制到另一个位置,然后执行shuffle操作,则不会出现任何超时异常。为什么会有这样的行为?根据我的理解,洗牌操作将发生在rdd上,所以不管它从n个位置读取数据,它的行为应该是相似的。
如果我的理解有误,请纠正我。

uurv41yg

uurv41yg1#

这个问题的某些方面相当令人困惑。
首先,您指的是在mapr集群中的“2个位置”中拥有数据。你是说两个目录中的数据吗?或者你真的是说两个地方?或者你的意思是你实际上有两个簇?
另一个让人困惑的地方是,您显示了内存不足问题的堆栈跟踪,但随后会讨论某种类型的超时。你到底有什么问题?
一般来说,内存不足异常不太依赖于数据源,但与资源的过度投入有很大关系。如果你有很多任务使用的内存或cpu比它们少很多,但是你可能有一些任务使用的更多,那么过度承诺是有用的。过度投入资源可以让您安排此类程序,但如果多个大任务落在同一台机器上,则可能会导致严重问题。这也会导致一些随机行为,因此如果你不小心验证你的理论,你很容易跳到错误的结论。
在Yarn下,通常依赖于过度使用。如果你在kubernetes上运行,通常会有更高的精度。
不幸的是,在这个问题上没有更多的细节/一致性,这样的概括是最好的。

相关问题