如何使用相同的程序将mapreduce输出插入hbase

tvokkenx  于 2021-06-01  发布在  Hadoop
关注(0)|答案(1)|浏览(338)

我写了一个程序,以pdf作为输入,并产生文本输出作为一个整体。我想用同一个程序在hbase中加载这个文本,有什么方法可以这样做吗?有什么帮助都可以

//Driver Class
package com.tcs;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class PdfInputDriver {

    public static void main(String[] args) throws IOException,InterruptedException, ClassNotFoundException 
    {
        Configuration conf = new Configuration();
        GenericOptionsParser parser = new GenericOptionsParser(conf, args);
        args = parser.getRemainingArgs();
        @SuppressWarnings("deprecation")
        Job job = new Job(conf, "Pdftext");
        job.setJarByClass(PdfInputDriver.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);
        job.setInputFormatClass(PdfInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);

        System.out.println(job.waitForCompletion(true));
    }
}

//InputFormatClass
package com.tcs;

import java.io.IOException;

import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

public class PdfInputFormat extends FileInputFormat<Object, Object> {

    @SuppressWarnings({ "unchecked", "rawtypes" })
    @Override
    public RecordReader createRecordReader(
            InputSplit split, TaskAttemptContext context) throws IOException,
            InterruptedException {

        return new PdfRecordReader();
    }

}

//PDF Record Reader class
package com.tcs;

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.pdfbox.pdmodel.PDDocument;
import org.apache.pdfbox.util.PDFTextStripper;

public class PdfRecordReader extends RecordReader<Object, Object> {

    private String[] lines = null;
    private LongWritable key = null;
    private Text value = null;

    @Override
    public void initialize(InputSplit genericSplit, TaskAttemptContext context)
            throws IOException, InterruptedException {

        FileSplit split = (FileSplit) genericSplit;
        Configuration job = context.getConfiguration();
        final Path file = split.getPath();

        /*
         * The below code contains the logic for opening the file and seek to
         * the start of the split. Here we are applying the Pdf Parsing logic
         */

        FileSystem fs = file.getFileSystem(job);
        FSDataInputStream fileIn = fs.open(split.getPath());
        PDDocument pdf = null;
        String parsedText = null;
        PDFTextStripper stripper;
        pdf = PDDocument.load(fileIn);
        stripper = new PDFTextStripper();
        parsedText = stripper.getText(pdf);
        //String delims = "[ ]";
        this.lines = parsedText.split("/n");
        }

    @Override
    public boolean nextKeyValue() throws IOException, InterruptedException {

        if (key == null) {
            key = new LongWritable();
            key.set(1);
            value = new Text();
            value.set(lines[0]);
        } else 
        {
            int temp = (int) key.get();
            if (temp < (lines.length - 1)) {
                int count = (int) key.get();
                value = new Text();
                value.set(lines[count]);
                count = count + 1;
                key = new LongWritable(count);
            } else {
                return false;
            }

        }
        if (key == null || value == null) {
            return false;
        } else {
            return true;
        }
    }

    @Override
    public LongWritable getCurrentKey() throws IOException,
            InterruptedException {

        return key;
    }

    @Override
    public Text getCurrentValue() throws IOException, InterruptedException {

        return value;
    }

    @Override
    public float getProgress() throws IOException, InterruptedException {

        return 0;
    }

    @Override
    public void close() throws IOException {

    }

}

//Mapper Class
package com.tcs;

import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class WordCountMapper extends Mapper<LongWritable, Text, Text, LongWritable>
{

    protected void map(LongWritable key, Text value, Context context)
         throws IOException, InterruptedException {

    context.write(value, key);
}
}

//Reducer Class
package com.tcs;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class WordCountReducer extends Reducer<Object, Object, Object, Object> {
    protected void reduce(Text key, Iterable<LongWritable> values,
            Context context) throws IOException, InterruptedException {

            context.write(key, new WordCountReducer());
    }
}
ljo96ir5

ljo96ir51#

我想你正在把它做成jar文件。只需使用由mapreduce输出生成的part-r-00000文件。创建“表”

相关问题