java—如果HadoopMapReduce中至少有一个拆分失败,如何停止处理其他拆分或文件本身

b91juud3  于 2021-05-29  发布在  Hadoop
关注(0)|答案(1)|浏览(271)

我有一个很大的csv文件,比如说2gb(或者说10k行)要用map reduce处理。知道每个块是128mb,我有16个块,因此有16个分割。由于它是一个文本文件,我可以让多个Map程序同时处理文件的不同部分(不同的拆分)。Map器计数默认为4,因此并行处理文件的4个部分
要求是,如果至少有一个剥离失败,我不想进一步处理该文件,或者不想将该文件的内容写入输出文件夹。也就是说,如果csv中至少有一行导致了错误,我不希望该文件被进一步处理,也不需要它的输出(这是因为我可以纠正错误并重新运行)
我该怎么做?
我已经延长了 FileInputFormat 覆盖 isSplittable 方法并返回true。如果我返回false,我知道只有一个Map程序将处理文件-但我担心太多的网络传输将发生完全处理文件。
我试过了 Counter ,但问题是如何让其他Map程序知道有人设置了一个计数器值,因为某些操作失败
任何提示都会有帮助。

ki0zmccv

ki0zmccv1#

Map绘制者是相互独立的,他们之间不可能有直接的交流。这么说来,没有直接的方式让mapper将失败传达给其他mapper。
如果您关心的是清理,那么可以在driver类中收集提交时作业的状态。

boolean done = job.waitForCompletion(true);

完成的值将是 false ,如果作业失败。如果它失败了(因为一个Map程序抛出了一个异常);只需清理输出目录,或者通过编程修复csv并重新运行。
编辑-基于op的评论
作业设置为运行到最后并优雅地完成。也就是说,所有异常都被捕获并记录,因此Map程序不会被杀死。这意味着,作业的完成状态将始终是成功的。这样做是为了确保mapper在处理坏文件时不会失败,而是继续处理好的文件,直到最后一个文件。
在这种情况下,可以使用计数器(在Map器中)来增加失败的计数。

context.getCounter("my_group", "bad_record").increment(1);

当作业完成时,只需获取驱动程序类中的计数器值,如果进程的计数为正,则将其标记为失败。

long value= job.getCounters().getGroup("my_group").findCounter("bad_record").getValue();

请注意上面代码中的空检查,以防没有坏记录和计数器完全不存在。
编辑-添加另一个可能的选项
通过下面的代码,可以从上下文中获取job的示例,并尝试在Map器中获取计数器值(并停止进一步处理该文件)。但我不确定,计数器是在执行过程中还是仅在执行结束后才可用。

Configuration conf = context.getConfiguration();
Cluster cluster = new Cluster(conf);
Job currentJob = cluster.getJob(context.getJobID());
long val=currentJob.getCounters().findCounter("bad_record").getValue();

我没有测试过。请试着告诉我。

相关问题