基于java的hadoop多节点编程

6ie5vjzr  于 2021-06-03  发布在  Hadoop
关注(0)|答案(1)|浏览(188)

我是hadoop的新手,我希望每次运行都使用2、4、6个节点来分割要发送到Map器的数据集。但是我写的代码不能正常工作。实际上,它适用于2个节点,但随着节点数的增加,输出文件中丢失的一些输出数据也会随之增加。你能帮帮我吗?谢谢您
代码如下:

public static void main(String[] args) throws Exception {

        System.out.println("MapReduce Started at:"+System.currentTimeMillis());
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(conf);
        int numOfNodes = 2;  

        Job job = new Job(conf, "calculateSAAM"); 
        job.setJarByClass(calculateSAAM.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(DoubleWritable.class);

        job.setMapperClass(Map.class);
        job.setReducerClass(Reduce.class);

        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        FileInputFormat.addInputPath(job, new Path("/home/helsene/wordcount/input"));
        String outputFile = "/home/helsene/wordcount/output/";

        long dataLength = fs.getContentSummary(new Path(outputFile)).getLength();
        FileInputFormat.setMaxInputSplitSize(job, (dataLength / numOfNodes));

        job.setNumReduceTasks(numOfNodes/2);
        Path outPath = new Path(outputFile);

        fs.delete(outPath, true);
        FileOutputFormat.setOutputPath(job, new Path(outputFile)); 

        job.waitForCompletion(true);
        System.out.println("MapReduce ends at:"+System.currentTimeMillis());
        }        
    }
n3h0vuf2

n3h0vuf21#

每个reducer生成一个输出文件,默认情况下命名为 part-xx ( part-00000 对于第一个减速器, part-00001 用于第二个减速器等)。
在您的代码中,当您有3个以上的节点时,您将有多个缩减器,因此输出数据将被拆分为多个部分(多个文件)。这意味着一些字数将在第一个文件(part-00000)中,一些字数将在第二个文件(part-00001)中,等等。您可以稍后通过调用getmerge命令来合并这些部分,如:

hadoop dfs -getmerge /HADOOP/OUTPUT/PATH /local/path/

并在指定的本地路径中获取一个包含所有部分文件的合并结果的文件。当您有两个节点,因此2/2=1 reducer(生成一个输出文件)时,该文件将具有与您得到的文件相同的结果。
顺便说一下,将减速器的数量设置为 numOfNodes/2 可能不是最好的选择。更多详情请参见此帖子。

相关问题