修改grep以解析sequence/snappy文件

ehxuflar  于 2021-06-03  发布在  Hadoop
关注(0)|答案(1)|浏览(271)

我试图让grep示例与cdh捆绑在一起,以读取sequence/snappy文件。
默认情况下,程序在尝试读取序列/快照文件时抛出错误:
java.io.eofexception:org.apache.hadoop.io.compress.blockdecompressorstream.getcompresseddata(blockdecompressorstream)的输入流中出现意外的块结尾。java:121)
在org.apache.hadoop.io.compress.blockdecompressorstream.decompress(blockdecompressorstream。java:95)
在org.apache.hadoop.io.compress.decompressorstream.read(decompressorstream。java:83)
在java.io.inputstream.read(inputstream。java:82)
所以我编辑了代码来读取序列文件。
改变:

FileInputFormat.setInputPaths(grepJob, args[0]);

收件人:

FileInputFormat.setInputPaths(grepJob, args[0]);
    grepJob.setInputFormatClass(SequenceFileAsTextInputFormat.class);

但我还是犯了同样的错误。
1) 我需要手动设置输入压缩编解码器吗?我以为sequencefile阅读器会自动检测压缩。
2) 如果需要手动设置压缩,是使用“setinputformatclass”还是在“conf”对象中设置的?

ffvjumwh

ffvjumwh1#

让我的代码工作。我有点困惑,如何,但我没有指定压缩编解码器在任何地方的代码。以下是原始代码:

package org.myorg; 

import java.util.Random; 

import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.conf.Configured; 
import org.apache.hadoop.fs.FileSystem; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.LongWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.*; 
import org.apache.hadoop.mapreduce.lib.input.*; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; 
import org.apache.hadoop.mapreduce.lib.map.InverseMapper; 
import org.apache.hadoop.mapreduce.lib.map.RegexMapper; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; 
import org.apache.hadoop.mapreduce.lib.reduce.LongSumReducer; 
import org.apache.hadoop.util.Tool; 
import org.apache.hadoop.util.ToolRunner; 
import org.apache.hadoop.io.SequenceFile.CompressionType; 
import org.apache.hadoop.io.SequenceFile.Metadata; 
import org.apache.hadoop.io.compress.*; 

/* Extracts matching regexs from input files and counts them. */ 
public class Grep extends Configured implements Tool { 
private Grep() {} // singleton 

public int run(String[] args) throws Exception { 
if (args.length < 3) { 
System.out.println("Grep <inDir> <outDir> <regex> [<group>]"); 
ToolRunner.printGenericCommandUsage(System.out); 
return 2; 
} 

Path tempDir = 
new Path("grep-temp-"+ 
Integer.toString(new Random().nextInt(Integer.MAX_VALUE))); 

Configuration conf = getConf(); 
conf.set(RegexMapper.PATTERN, args[2]); 
if (args.length == 4) 
conf.set(RegexMapper.GROUP, args[3]); 

Job grepJob = new Job(conf); 

try { 

grepJob.setJobName("grep-search"); 

FileInputFormat.setInputPaths(grepJob, args[0]); 
grepJob.setInputFormatClass(SequenceFileAsTextInputFormat.class); 

grepJob.setMapperClass(RegexMapper.class); 

grepJob.setCombinerClass(LongSumReducer.class); 
grepJob.setReducerClass(LongSumReducer.class); 
FileOutputFormat.setOutputPath(grepJob, tempDir); 
grepJob.setOutputFormatClass(SequenceFileOutputFormat.class); 
grepJob.setOutputKeyClass(Text.class); 
grepJob.setOutputValueClass(LongWritable.class); 

grepJob.waitForCompletion(true); 

Job sortJob = new Job(conf); 
sortJob.setJobName("grep-sort"); 

FileInputFormat.setInputPaths(sortJob, tempDir); 
sortJob.setInputFormatClass(SequenceFileInputFormat.class); 

sortJob.setMapperClass(InverseMapper.class); 

sortJob.setNumReduceTasks(1); // write a single file 
FileOutputFormat.setOutputPath(sortJob, new Path(args[1])); 
sortJob.setSortComparatorClass( // sort by decreasing freq 
LongWritable.DecreasingComparator.class); 

sortJob.waitForCompletion(true); 
} 
finally { 
FileSystem.get(conf).delete(tempDir, true); 
} 
return 0; 
} 

public static void main(String[] args) throws Exception { 
int res = ToolRunner.run(new Configuration(), new Grep(), args); 
System.exit(res); 
} 

}

我很困惑,因为在一个简单的“猫”程序,我必须设置压缩编解码器。

package org.myorg;
import java.io.*;
import java.util.*;
import java.net.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.util.*;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.SequenceFile.Metadata;
import org.apache.hadoop.io.compress.*;
import java.nio.charset.*;

public class seqcat {

public static void main(String[] args) throws IOException {
    String uri = args[0];
    Configuration conf = new Configuration();
    FileSystem fs = FileSystem.getLocal(conf);
    Path seqFilePath = new Path(uri);
    SequenceFile.Reader reader = new SequenceFile.Reader(conf, SequenceFile.Reader.file(seqFilePath));

    Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
    Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf);

    CompressionCodecFactory ccf = new CompressionCodecFactory(conf);
    CompressionCodec codec = ccf.getCodecByClassName(DefaultCodec.class.getName());

    while (reader.next(key, value)) {
                byte[] strBytes = ((BytesWritable) value).getBytes();
                int len = ((BytesWritable) value).getLength();
                String val = new String(strBytes,0,len,Charset.forName("UTF-8"));
                System.out.println(val);
           }

   reader.close();
  }
}

相关问题