有人能告诉我下面的代码有什么问题吗。
你能帮我怎么用这个mapreduce程序得到下面的输出吗??实际上,这段代码工作正常,但输出不是预期的。。。输出在两个文件中生成,但在name.txt文件或age.txt文件中,输出是交换的
输入文件:
Name:A
Age:28
Name:B
Age:25
Name:K
Age:20
Name:P
Age:18
Name:Ak
Age:11
Name:N
Age:14
Name:Kr
Age:26
Name:Ra
Age:27
我的输出应该分为姓名和年龄
文件名:
Name:A
Name:B
Name:K
Name:P
Name:Ak
Name:N
Name:Kr
Name:Ra
年龄文件:
Age:28
Age:25
Age:20
Age:18
Age:11
Age:14
Age:26
Age:27
我的代码:
mymapper.java文件
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
public class MyMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text> {
public void map(LongWritable key, Text value,OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
String [] dall=value.toString().split(":");
output.collect(new Text(dall[0]),new Text(dall[1]));
}
}
myreducer.java文件:
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
public class MyReducer extends MapReduceBase implements Reducer<Text, Text, Text, Text> {
public void reduce(Text key, Iterator<Text> values,OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
while (values.hasNext()) {
output.collect(new Text(key),new Text(values.next()));
}
}
}
多文件输出.java:
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.lib.*;
public class MultiFileOutput extends MultipleTextOutputFormat<Text, Text>{
protected String generateFileNameForKeyValue(Text key, Text value,String name) {
//return new Path(key.toString(), name).toString();
return key.toString();
}
protected Text generateActualKey(Text key, Text value) {
//return new Text(key.toString());
return null;
}
}
mydriver.java文件:
import java.io.IOException;
import java.lang.Exception;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
public class MyDriver{
public static void main(String[] args) throws Exception,IOException {
Configuration mycon=new Configuration();
JobConf conf = new JobConf(mycon,MyDriver.class);
//JobConf conf = new JobConf(MyDriver.class);
conf.setJobName("Splitting");
conf.setMapperClass(MyMapper.class);
conf.setReducerClass(MyReducer.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(MultiFileOutput.class);
conf.setOutputKeyClass(Text.class);
conf.setMapOutputKeyClass(Text.class);
//conf.setOutputValueClass(Text.class);
conf.setMapOutputValueClass(Text.class);
FileInputFormat.setInputPaths(conf,new Path(args[0]));
FileOutputFormat.setOutputPath(conf,new Path(args[1]));
JobClient.runJob(conf);
//System.err.println(JobClient.runJob(conf));
}
}
谢谢你
1条答案
按热度按时间jpfvwuh41#
好吧,这个用例比简单的字数计算要复杂一些:)
所以您需要的是一个复杂的键&一个分区器。并设置减速器的数量=2
您的复合键可以是文本(name | a或age | 28的串联)或customwriteable(有2个示例变量包含type(name或age)&value)
在Map器中,创建文本或自定义可写,并将其设置为输出键,值可以只是人员的姓名或年龄。
创建一个分区器(它实现org.apache.hadoop.mapred.partitioner)。在getpartition方法中,基本上根据键来决定它将转到哪个reducer。
希望这有帮助。