hadoop->mapper->如何从给定的输入路径中只读取每个文件的前n行?

pgky5nke  于 2021-06-04  发布在  Hadoop
关注(0)|答案(2)|浏览(360)

我是hadoop新手,我的要求是只需要处理每个输入文件的前10行。以及如何在读取每个文件的10行之后退出mapper。
如果有人能提供一些示例代码,这将是很大的帮助。
提前谢谢。

92vpleto

92vpleto1#

您可以重写Map器的run方法,一旦您将map循环迭代10次,就可以中断while循环。这将假定您的文件不可拆分,否则您将从每个拆分中获得前10行:

@Override
public void run(Context context) throws IOException, InterruptedException {
  setup(context);

  int rows = 0;
  while (context.nextKeyValue()) {
    if (rows++ == 10) {
      break;
    }

    map(context.getCurrentKey(), context.getCurrentValue(), context);
  }

  cleanup(context);
}
pvabu6sv

pvabu6sv2#

假设n=10,那么我们可以使用下面的代码从下面的文件中只读取10条记录,如下所示:
第1行
线路2
.
.
.
第20行

//mapper
   class Mapcls extends Mapper<LongWritable, Text, Text, NullWritable> 
   {
    public void run(Context con) throws IOException, InterruptedException
    {
        setup(con);
        int rows = 0;
        while(con.nextKeyValue())
        {
            if(rows++ == 10)
            {
                break;
            }
            map(con.getCurrentKey(), con.getCurrentValue(), con);
        }

        cleanup(con);
     }

    public void map(LongWritable key, Text value, Context con) throws IOException, InterruptedException
     {
        con.write(value, NullWritable.get());
     }
    }

    //driver
    public class Testjob extends Configured implements Tool
    {

     @Override
     public int run(String[] args) throws Exception 
     {
        Configuration conf = new Configuration();
        Job job = new Job(conf, "Test-job");
        job.setJobName("tst001");
        job.setJarByClass(getClass());

        job.setMapperClass(Mapcls.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        return job.waitForCompletion(true) ? 0 : 1;
      }

      public static void main(String[] args) throws Exception
      {
        int rc = ToolRunner.run(new Configuration(), new Testjob(), args);
        System.exit(rc);
      }
    }

然后输出为:
第1行
第10行
线路2
第3行
第4行
第5行
第6行
第7行
第8行
第9行

相关问题