使用线程池时hadoop“filesystem closed”异常

zphenhs4  于 2021-06-03  发布在  Hadoop
关注(0)|答案(1)|浏览(334)

我对hadoop相当陌生,我在一个5节点集群上运行多个mapreduce作业。当运行多个线程时,我开始出现“filesystem closed”异常。当一次运行一个作业时,这些作业工作正常。错误出现在Map之后,就在还原之前。看起来是这样的:

java.lang.Exception: java.io.IOException: Filesystem closed
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:399)
Caused by: java.io.IOException: Filesystem closed
at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:552)
at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:648)
at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:706)
at java.io.DataInputStream.read(Unknown Source)
at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209)
at org.apache.hadoop.util.LineReader.readLine(LineReader.java:173)
at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.nextKeyValue(LineRecordReader.java:167)
at org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader.nextKeyValue(MapTask.java:526)
at org.apache.hadoop.mapreduce.task.MapContextImpl.nextKeyValue(MapContextImpl.java:80)
at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.nextKeyValue(WrappedMapper.java:91)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:143)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:756)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:338)
at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:231)
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
at java.util.concurrent.FutureTask$Sync.innerRun(Unknown Source)
at java.util.concurrent.FutureTask.run(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)

这种情况不会一直发生,如果我重新执行失败的作业,它将正常运行。不幸的是,这占用了太多时间。我假设这与多个任务访问同一个输入文件有关,当一个任务完成时,它将关闭所有任务的输入文件。如果这是一个问题,我想知道的是如何覆盖这个。我尝试在Map器中覆盖清理以重新打开路径,但这似乎很愚蠢,而且不起作用。

@Override 
public void cleanup(Context context){
        Job tempJob;
        try {
            tempJob = new Job();
            Path fs = ((FileSplit) context.getInputSplit()).getPath();
            FileInputFormat.addInputPath(tempJob, fs);
            System.out.println("Finished map task for " + context.getJobName());
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

我还想知道这是否是使用线程池执行hadoopmapreduce作业的根本问题。谢谢你的建议。
编辑:当我提到工作和任务时,我可能有点不清楚。实际上,我用自己的Map器和还原器运行多个作业。每个作业都将为我正在创建的特定表生成一列。说一个总数或一个计数。每个作业都有自己的线程,它们都在访问同一个输入文件。我遇到的问题是,当一些作业完成时,它们会抛出“filesystem closed exception”。我也使用Yarn,如果这可能会有所不同。

rryofs0p

rryofs0p1#

作为一般规则,除非您有一个非常cpu密集型的作业,否则我不建议在同一个任务中使用多个线程,这会增加jvm中出现问题的可能性,并且重新运行任务的成本要高得多。您可能应该考虑增加map任务的数量,当然每个任务都将在一个单独的jvm中运行,但是这样做会更干净。
如果您真的想采用多线程方式,那么我怀疑您使用了错误的Map器类型,对于多线程应用程序,您应该使用 MultithreadedMapper 它有一个不同的实现 run 方法,并且应该是线程安全的。你可以这样使用它:

job.setMapperClass(MultithreadedMapper.class);

可以这样指定线程数:

int numThreads = 42;
MultithreadedMapper.setNumberOfThreads(numThreads);

相关问题