//Driver Class
package com.tcs;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class PdfInputDriver {

    public static void main(String[] args) throws IOException,InterruptedException, ClassNotFoundException 
        Configuration conf = new Configuration();
        GenericOptionsParser parser = new GenericOptionsParser(conf, args);
        args = parser.getRemainingArgs();
        Job job = new Job(conf, "Pdftext");
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));


package com.tcs;

import java.io.IOException;

import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

public class PdfInputFormat extends FileInputFormat<Object, Object> {

    @SuppressWarnings({ "unchecked", "rawtypes" })
    public RecordReader createRecordReader(
            InputSplit split, TaskAttemptContext context) throws IOException,
            InterruptedException {

        return new PdfRecordReader();


//PDF Record Reader class
package com.tcs;

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.pdfbox.pdmodel.PDDocument;
import org.apache.pdfbox.util.PDFTextStripper;

public class PdfRecordReader extends RecordReader<Object, Object> {

    private String[] lines = null;
    private LongWritable key = null;
    private Text value = null;

    public void initialize(InputSplit genericSplit, TaskAttemptContext context)
            throws IOException, InterruptedException {

        FileSplit split = (FileSplit) genericSplit;
        Configuration job = context.getConfiguration();
        final Path file = split.getPath();

         * The below code contains the logic for opening the file and seek to
         * the start of the split. Here we are applying the Pdf Parsing logic

        FileSystem fs = file.getFileSystem(job);
        FSDataInputStream fileIn = fs.open(split.getPath());
        PDDocument pdf = null;
        String parsedText = null;
        PDFTextStripper stripper;
        pdf = PDDocument.load(fileIn);
        stripper = new PDFTextStripper();
        parsedText = stripper.getText(pdf);
        //String delims = "[ ]";
        this.lines = parsedText.split("/n");

    public boolean nextKeyValue() throws IOException, InterruptedException {

        if (key == null) {
            key = new LongWritable();
            value = new Text();
        } else 
            int temp = (int) key.get();
            if (temp < (lines.length - 1)) {
                int count = (int) key.get();
                value = new Text();
                count = count + 1;
                key = new LongWritable(count);
            } else {
                return false;

        if (key == null || value == null) {
            return false;
        } else {
            return true;

    public LongWritable getCurrentKey() throws IOException,
            InterruptedException {

        return key;

    public Text getCurrentValue() throws IOException, InterruptedException {

        return value;

    public float getProgress() throws IOException, InterruptedException {

        return 0;

    public void close() throws IOException {



//Mapper Class
package com.tcs;

import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class WordCountMapper extends Mapper<LongWritable, Text, Text, LongWritable>

    protected void map(LongWritable key, Text value, Context context)
         throws IOException, InterruptedException {

    context.write(value, key);

//Reducer Class
package com.tcs;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class WordCountReducer extends Reducer<Object, Object, Object, Object> {
    protected void reduce(Text key, Iterable<LongWritable> values,
            Context context) throws IOException, InterruptedException {

            context.write(key, new WordCountReducer());


