在hadoop中处理大文件时出现洗牌、合并和获取错误

yacmzcpb  于 2021-06-02  发布在  Hadoop
关注(0)|答案(2)|浏览(458)

我正在运行一个像mapreduce这样的字数统计作业,每个作业处理200个1gb的文件。我在一个hadoop集群上运行这个作业,该集群包含4个datanode(每个数据节点2 cpu),8gb内存和大约200g空间。我尝试过各种配置选项,但每次作业失败时,都会出现inmemory shuffle、ondisk shuffle、inmemory merge、ondisk merge或fetcher错误。
Map器输出的大小与输入文件的大小相当,因此,为了最小化Map器输出的大小,我将bzip2压缩用于mapreduce输出。然而,即使有一个压缩的Map输出,我仍然得到错误的减速器阶段。我用4个减速机。因此,我尝试了hadoop集群的各种配置:
群集的标准配置是:

Default virtual memory for a job's map-task      3328 Mb
    Default virtual memory for a job's reduce-task  6656 Mb
    Map-side sort buffer memory 205 Mb
    Mapreduce Log Dir Prefix    /var/log/hadoop-mapreduce
    Mapreduce PID Dir Prefix    /var/run/hadoop-mapreduce
    yarn.app.mapreduce.am.resource.mb   6656
    mapreduce.admin.map.child.java.opts -Djava.net.preferIPv4Stack=TRUE -Dhadoop.metrics.log.level=WARN
    mapreduce.admin.reduce.child.java.opts  -Djava.net.preferIPv4Stack=true -Dhadoop.metrics.log.level=WARN
    mapreduce.admin.user.env LD_LIBRARY_PATH=/usr/lib/hadoop/lib/native:/usr/lib/hadoop/lib/native/`$JAVA_HOME/bin/java -d32 -version &> /dev/null;if [ $? -eq 0 ]; then echo Linux-i386-32; else echo Linux-amd64-64;fi`
    mapreduce.am.max-attempts   2
    mapreduce.application.classpath $HADOOP_MAPRED_HOME/share/hadoop/mapreduce/*,$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/lib/*
    mapreduce.cluster.administrators    hadoop
    mapreduce.framework.name    yarn
 mapreduce.job.reduce.slowstart.completedmaps   0.05
    mapreduce.jobhistory.address    ip-XXXX.compute.internal:10020
    mapreduce.jobhistory.done-dir   /mr-history/done
    mapreduce.jobhistory.intermediate-done-dir  /mr-history/tmp
    mapreduce.jobhistory.webapp.address ip-XXXX.compute.internal:19888
    mapreduce.map.java.opts -Xmx2662m
    mapreduce.map.log.level INFO
    mapreduce.map.output.compress   true
    mapreduce.map.sort.spill.percent    0.7
    mapreduce.map.speculative   false
    mapreduce.output.fileoutputformat.compress  true
    mapreduce.output.fileoutputformat.compress.type BLOCK
    mapreduce.reduce.input.buffer.percent   0.0
    mapreduce.reduce.java.opts  -Xmx5325m
    mapreduce.reduce.log.level  INFO
    mapreduce.reduce.shuffle.input.buffer.percent 0.7
    mapreduce.reduce.shuffle.merge.percent  0.66
    mapreduce.reduce.shuffle.parallelcopies 30
    mapreduce.reduce.speculative    false
    mapreduce.shuffle.port  13562
    mapreduce.task.io.sort.factor   100
    mapreduce.task.timeout  300000
    yarn.app.mapreduce.am.admin-command-opts    -Djava.net.preferIPv4Stack=true -Dhadoop.metrics.log.level=WARN
    yarn.app.mapreduce.am.command-opts  -Xmx5325m
    yarn.app.mapreduce.am.log.level INFO
    yarn.app.mapreduce.am.staging-dir   /user
    mapreduce.map.maxattempts       4
    mapreduce.reduce.maxattempts        4

此配置导致以下错误:

14/05/16 20:20:05 INFO mapreduce.Job:  map 20% reduce 3%
14/05/16 20:27:13 INFO mapreduce.Job:  map 20% reduce 0%
14/05/16 20:27:13 INFO mapreduce.Job: Task Id : attempt_1399989158376_0049_r_000000_0,      Status : FAILED
Error: org.apache.hadoop.mapreduce.task.reduce.Shuffle$ShuffleError: error in shuffle in InMemoryMerger - Thread to merge in-memory shuffled map-outputs
    at org.apache.hadoop.mapreduce.task.reduce.Shuffle.run(Shuffle.java:121)
    at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:380)
    at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:162)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:396)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
    at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:157)
 Caused by: org.apache.hadoop.util.DiskChecker$DiskErrorException: Could not find any valid local directory for output/attempt_1399989158376_0049_r_000000_0/map_2038.out
    at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:398)
    at org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:150)
    at org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:131)
    at org.apache.hadoop.mapred.YarnOutputFiles.getInputFileForWrite(YarnOutputFiles.java:213)
    at org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl$InMemoryMerger.merge(MergeManagerImpl.java:450)
    at org.apache.hadoop.mapreduce.task.reduce.MergeThread.run(MergeThread.java:94)

然后我尝试改变各种选项,在无序播放阶段跳转以减少负载,但是我得到了相同的错误。

mapreduce.reduce.shuffle.parallelcopies     5
mapreduce.task.io.sort.factor   10

mapreduce.reduce.shuffle.parallelcopies     10
mapreduce.task.io.sort.factor   20

然后我意识到我的数据节点上的tmp文件是不存在的,因此所有的合并和洗牌都发生在内存中。因此,我在每个datanode上手动添加了。我保留了初始配置,但在reducer启动之前增加了时间延迟,以限制datanode上的负载。

mapreduce.job.reduce.slowstart.completedmaps 0.7

我还尝试增加io.sort.mb:

mapreduce.task.io.sort.mb from 205 to 512.

但是现在我得到以下ondisk错误:

14/05/26 12:17:08 INFO mapreduce.Job:  map 62% reduce 21%
14/05/26 12:20:13 INFO mapreduce.Job: Task Id : attempt_1400958508328_0021_r_000000_0, Status : FAILED
Error: org.apache.hadoop.mapreduce.task.reduce.Shuffle$ShuffleError: error in shuffle in OnDiskMerger - Thread to merge on-disk map-outputs
at org.apache.hadoop.mapreduce.task.reduce.Shuffle.run(Shuffle.java:121)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:380)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:162)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:157)
Caused by: org.apache.hadoop.util.DiskChecker$DiskErrorException: Could not find any valid local directory for hadoop/yarn/local/usercache/eoc21/appcache/application_1400958508328_0021/output/attempt_1400958508328_0021_r_000000_0/map_590.out
at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:398)
at org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:150)
at org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:131)
at org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl$OnDiskMerger.merge(MergeManagerImpl.java:536)
at org.apache.hadoop.mapreduce.task.reduce.MergeThread.run(MergeThread.java:94)

减速器下降到0%,当它恢复到17%时,我得到以下错误:

14/05/26 12:32:03 INFO mapreduce.Job: Task Id : attempt_1400958508328_0021_r_000000_1, Status : FAILED
Error: org.apache.hadoop.mapreduce.task.reduce.Shuffle$ShuffleError: error in shuffle in fetcher#22
at org.apache.hadoop.mapreduce.task.reduce.Shuffle.run(Shuffle.java:121)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:380)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:162)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:157)
Caused by: org.apache.hadoop.util.DiskChecker$DiskErrorException: Could not find any valid local directory for output/attempt_1400958508328_0021_r_000000_1/map_1015.out
at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:398)
at org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:150)
at org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:131)
at org.apache.hadoop.mapred.YarnOutputFiles.getInputFileForWrite(YarnOutputFiles.java:213)
at org.apache.hadoop.mapreduce.task.reduce.OnDiskMapOutput.<init>(OnDiskMapOutput.java:61)
at org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl.reserve(MergeManagerImpl.java:257)
at org.apache.hadoop.mapreduce.task.reduce.Fetcher.copyMapOutput(Fetcher.java:411)
at org.apache.hadoop.mapreduce.task.reduce.Fetcher.copyFromHost(Fetcher.java:341)
at org.apache.hadoop.mapreduce.task.reduce.Fetcher.run(Fetcher.java:165)

我四处阅读,似乎“找不到任何有效的输出/尝试本地目录\u 140095858328\u 0021\u r\u000000\u 1/map\u 1015.out”与节点上没有足够的空间来溢出有关。但是我检查了数据节点,似乎有足够的空间:

Filesystem      Size  Used Avail Use% Mounted on
/dev/xvde1       40G   22G   18G  56% /
none            3.6G     0  3.6G   0% /dev/shm
/dev/xvdj      1008G  758G  199G  80% /hadoop/hdfs/data

所以不知道该尝试什么了。集群是否太小,无法处理此类作业?我需要更多的数据节点空间吗?有没有办法在hadoop上为作业找到最佳配置?任何建议都将不胜感激!

0x6upsns

0x6upsns1#

这可能是我知道的四件事之一,很可能是你在关于磁盘空间的问题上提出的观点,或者类似的问题-inodes:
文件被另一个进程删除(不太可能,除非你记得自己这么做)
磁盘错误(不太可能)
磁盘空间不足
索引节点不足(运行 df -i )
即使你跑了 df -h 以及 df -i 在工作之前/之后,你不知道有多少东西在工作期间被吃掉和清理掉。因此,当你的工作正在运行时,建议观察这些数字/将它们记录到一个文件/绘制它们的图形/等等。

watch "df -h && df -i"
s8vozzvw

s8vozzvw2#

您需要指定一些临时目录来存储中间Map并减少输出。可能是您没有指定任何临时目录,因此找不到任何有效的目录来存储中间数据。您可以通过编辑mapred-site.xml来完成

<property>
  <name>mapred.local.dir</name>
  <value>/temp1,/temp2,/temp3</value>
</property>

本地文件系统中写入临时mapreduce数据的路径的逗号分隔列表。多路径有助于扩展磁盘i/o。
指定这些临时目录后,它将存储中间Map并通过以下任意方式选择临时目录来减少输出
随机:在这种情况下,reduce任务的中间数据存储在随机选择的数据位置。
max:在这种情况下,reduce任务的中间数据存储在可用空间最大的数据位置。
循环:在这种情况下,Map器和还原器通过循环调度选择磁盘,以便在本地磁盘数内的作业级别存储中间数据。作业id用于在本地磁盘上创建唯一的子目录,以存储每个作业的中间数据。
您可以在mapred-site.xml示例中设置此属性

<property>
  <name>mapreduce.job.local.dir.locator</name>
  <value>max</value>
</property>

在hadoop中,默认情况下是循环

相关问题