如何在map reduce程序中解析pdf文件?

0g0grzrc  于 2021-06-03  发布在  Hadoop
关注(0)|答案(2)|浏览(304)

我想在我的hadoop 2.2.0程序中解析pdf文件,我发现了这个,按照它所说的,直到现在,我有以下三个类: PDFWordCount :包含map和reduce函数的主类(就像本机hadoop wordcount示例,但是 TextInputFormat 我用了我的 PDFInputFormat 班级。 PDFRecordReader extends RecordReader<LongWritable, Text> :这是这里的主要工作。尤其是我把我的 initialize 函数在这里进行更多说明。

public void initialize(InputSplit genericSplit, TaskAttemptContext context)
      throws IOException, InterruptedException {
      System.out.println("initialize");
      System.out.println(genericSplit.toString());
    FileSplit split = (FileSplit) genericSplit;
    System.out.println("filesplit convertion has been done");
    final Path file = split.getPath();
    Configuration conf = context.getConfiguration();
    conf.getInt("mapred.linerecordreader.maxlength", Integer.MAX_VALUE);
    FileSystem fs = file.getFileSystem(conf);
    System.out.println("fs has been opened");
    start = split.getStart();
    end = start + split.getLength();
    System.out.println("going to open split");
    FSDataInputStream filein = fs.open(split.getPath());
    System.out.println("going to load pdf");
    PDDocument pd = PDDocument.load(filein);
    System.out.println("pdf has been loaded");
    PDFTextStripper stripper = new PDFTextStripper();
    in =
    new LineReader(new ByteArrayInputStream(stripper.getText(pd).getBytes(
        "UTF-8")));
    start = 0;
    this.pos = start;
    System.out.println("init has finished");
  }

(你可以看到我的 system.out.println 用于调试。此方法转换失败 genericSplitFileSplit . 我在控制台中看到的最后一件事是:

hdfs://localhost:9000/in:0+9396432

哪个是
genericSplit.toString() PDFInputFormat extends FileInputFormat<LongWritable, Text> :它只是创建 new PDFRecordReadercreateRecordReader 方法。
我想知道我犯了什么错?
我需要额外的课程吗?

kxeu7u2r

kxeu7u2r1#

package com.sidd.hadoop.practice.pdf;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import com.sidd.hadoop.practice.input.pdf.PdfFileInputFormat;
import com.sidd.hadoop.practice.output.pdf.PdfFileOutputFormat;

public class ReadPdfFile {

    public static class MyMapper extends
            Mapper<LongWritable, Text, LongWritable, Text> {
        public void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
//          context.progress();
            context.write(key, value);
        }
    }

    public static class MyReducer extends
            Reducer<LongWritable, Text, LongWritable, Text> {
        public void reduce(LongWritable key, Iterable<Text> values,
                           Context context) throws IOException, InterruptedException {
            if (values.iterator().hasNext()) {
                context.write(key, values.iterator().next());
            } else {
                context.write(key, new Text(""));
            }
        }
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();

        Job job = new Job(conf, "Read Pdf");
        job.setJarByClass(ReadPdfFile.class);

        job.setMapperClass(MyMapper.class);
        job.setReducerClass(MyReducer.class);
        job.setOutputKeyClass(LongWritable.class);
        job.setOutputValueClass(Text.class);

        job.setInputFormatClass(PdfFileInputFormat.class);
        job.setOutputFormatClass(PdfFileOutputFormat.class);

        removeDir(args[1], conf);

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }

    public static void removeDir(String path, Configuration conf) throws IOException {
        Path output_path = new Path(path);
        FileSystem fs = FileSystem.get(conf);

        if (fs.exists(output_path)) {
            fs.delete(output_path, true);
        }
    }   

}
tzcvj98z

tzcvj98z2#

阅读PDF并不是那么困难,您需要扩展fileinputformat类以及recordreader。fileinputclass不能分割pdf文件,因为它们是二进制文件。

public class PDFInputFormat extends FileInputFormat<Text, Text> {

  @Override
  public RecordReader<Text, Text> createRecordReader(InputSplit split,
    TaskAttemptContext context) throws IOException, InterruptedException {
      return new PDFLineRecordReader();
  }

  // Do not allow to ever split PDF files, even if larger than HDFS block size
  @Override
  protected boolean isSplitable(JobContext context, Path filename) {
    return false;
  }

}

然后recordreader自己执行读取(我使用pdfbox读取PDF)。

public class PDFLineRecordReader extends RecordReader<Text, Text> {

private Text key = new Text();
private Text value = new Text();
private int currentLine = 0;
private List<String> lines = null;

private PDDocument doc = null;
private PDFTextStripper textStripper = null;

@Override
public void initialize(InputSplit split, TaskAttemptContext context)
        throws IOException, InterruptedException {

    FileSplit fileSplit = (FileSplit) split;
    final Path file = fileSplit.getPath();

    Configuration conf = context.getConfiguration();
    FileSystem fs = file.getFileSystem(conf);
    FSDataInputStream filein = fs.open(fileSplit.getPath());

    if (filein != null) {

        doc = PDDocument.load(filein);

        // Konnte das PDF gelesen werden?
        if (doc != null) {
            textStripper = new PDFTextStripper();
            String text = textStripper.getText(doc);

            lines = Arrays.asList(text.split(System.lineSeparator()));
            currentLine = 0;

        }

    }
}

    // False ends the reading process
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {

    if (key == null) {
        key = new Text();
    }

    if (value == null) {
        value = new Text();
    }

    if (currentLine < lines.size()) {
        String line = lines.get(currentLine);

        key.set(line);

        value.set("");
        currentLine++;

        return true;
    } else {

        // All lines are read? -> end
        key = null;
        value = null;
        return false;
    }
}

@Override
public Text getCurrentKey() throws IOException, InterruptedException {
    return key;
}

@Override
public Text getCurrentValue() throws IOException, InterruptedException {
    return value;
}

@Override
public float getProgress() throws IOException, InterruptedException {
    return (100.0f / lines.size() * currentLine) / 100.0f;
}

@Override
public void close() throws IOException {

    // If done close the doc
    if (doc != null) {
        doc.close();
    }

}

希望这有帮助!

相关问题