我开始使用hadoop 3.1.2,我有如下数据集:
id station; city; temperature
1; New York; 14
3; New York; 20
2; Bristol; 29
8; Rome; -10
30; Bristol; 2
10; Rome; 0
1; New York; 10
8; Rome; 10
通过hadoop和mapreduce,我应该得到:按id站分组,以及平均温度。然而,我对所有的城市都不感兴趣,只对那些车站id=1,8的城市感兴趣,例如。。。
场景:计算城市的总/平均温度Map(key,value)--->key:id带有城市名称的站点,value:它们的温度。减:按id站+城市名称分组,取各站平均气温。
得到如下结果:
City - Station; Average Temperature
New York - 1; 7
Rome - 8; 0
代码如下:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class SingleMapperReducer
{
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = new Job(conf, "City Temperature Job");
job.setMapperClass(TemperatureMapper.class);
job.setReducerClass(TemperatureReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
/*
Id, City, Temperature
1; New York; 14
3; New York; 20
2; Bristol; 29
1; Rome; 20
2; Rome; -10
2; Bristol; 2
3; New YOrk; 10
1; Rome; 10
*/
private static class TemperatureMapper
extends Mapper<Object, Text, Text, IntWritable> {
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
String txt = value.toString();
String[] tokens = txt.split(";");
String id = tokens[0].trim();
String temperature = tokens[2].trim();
if(id.equals("1"))
{
id="New York - 1";
}
else if(cat.equals("8"))
{
id="Rome - 8";
}
if (temperature.compareTo("Temperature") != 0)
context.write(new Text(id), new IntWritable(Integer.parseInt(temperature)));
}
}
private static class TemperatureReducer //on id city
extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
int sum = 0;
int n = 0;
for (IntWritable val : values) {
sum += val.get();
n +=1;
}
result.set(sum/n);
context.write(key, result);
}
}
}
你觉得行吗?
代码的一部分,我开发的id站过滤器,是正确的吗?是否有其他方法应用此筛选器?
感谢那些想帮助我的人!
更新26/11
@蟋蟀007@amey shirke
谢谢!我尝试执行代码,进行建议的更改:
if (id.equals ("1") || id.equals ("8")) {
id = id + "-" + tokens [1];
context.write (new Text (id), new IntWritable (Integer.parseInt (temperature)));
}
还有更多:
Configuration conf = new Configuration ();
Job job = Job.getInstance (conf, "word count");
job.setJarByClass(SingleMapperReducer.class);
系统执行这个过程,但给我一个空的输出文件。
p、 我试过hadoop的mapreduce框架在一个很小的“wordcount”案例中,它是有效的。
会发生什么事?谢谢
1条答案
按热度按时间rdlzhqv91#
需要注意的几点:
如果您只关心ids1&8,那么只需要context.write()即可。与…一致的东西
这段代码没有帮助,也不是必需的
(temperature.compareTo("Temperature") != 0)