我正在用hadoop创建一个项目。我不太了解hadoop和mapreducing过程,所以我的问题是:我有一个三列的大数据集;客户id、时间戳(yyyy-mm-dd hh:mm:ss)和消费。我想找到客户,在某一天的最大消费量,我会选择。
这是我的密码:
我的司机
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class alicanteDriver {
public static void main(String[] args) throws Exception {
long t_start = System.currentTimeMillis();
long t_end;
Configuration conf = new Configuration();//JobConf(WordCountDriver.class);//
Job job = Job.getInstance(conf, "WorCount");
job.setJarByClass(alicanteDriver.class);
//job.setMapperClass(alicanteMapper.class);
job.setMapperClass(alicanteMapperC.class);
//job.setCombinerClass(alicanteCombiner.class);
job.setPartitionerClass(alicantePartitioner.class);
//job.setPartitionerClass(alicantePartitionerC.class);
job.setNumReduceTasks(8);
job.setReducerClass(alicanteReducer.class);
//job.setReducerClass(alicanteReducerC.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path("/alicante_1y.txt"));
FileOutputFormat.setOutputPath(job, new Path("/alicante_output"));
job.waitForCompletion(true);
t_end = System.currentTimeMillis();
System.out.println((t_end-t_start)/1000);
}
}
我的Map器
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.StringTokenizer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class alicanteMapperC extends
Mapper<LongWritable, Text, Text, IntWritable> {
String Customer = new String();
SimpleDateFormat ft = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Date t = new Date();
Date d2 = new Date();
IntWritable Consumption = new IntWritable();
int counter = 0;
// temp variables
int max = 0;
String cust = new String();
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
try {
d2 = ft.parse("2013-07-01 01:00:00");
} catch (ParseException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
// find max
if (counter > 0) {
String line = value.toString();
StringTokenizer itr = new StringTokenizer(line, ",");
while (itr.hasMoreTokens()) {
Customer = itr.nextToken();
Consumption.set(Integer.parseInt(itr.nextToken()));
try {
t = ft.parse(itr.nextToken());
if (t.compareTo(d2) == 0) {
max = Math.max(max, Consumption.get());
}
} catch (ParseException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
counter++;
}
}
我的减速机
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class alicanteReducer extends Reducer<Text, Text, Text, IntWritable> {
int Consumption;
String Customer;
int max;
IntWritable sum_consumption = new IntWritable();
IntWritable ca = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
//int sum = 0;
for (IntWritable val:values){
max = val.get();
Consumption = val.get();
Customer = val.toString();
if(Consumption == max){
context.write(new Text(Customer), ca);
}
//sum += Consumption;
}
//sum_consumption.set(sum);
//context.write(new Text(Integer.toString(max)), sum_consumption);
}
}
如你所见,我能找到某一天的最大消耗量。我无法做到的是,将客户与这一天的最大消费量相关联,并将其ID写入输出文件。
在mapper类中,我尝试运行了一秒钟,使用max来查找客户,但是时间太长了。
你对课程的结构有什么看法吗?提前谢谢。
暂无答案!
目前还没有任何答案,快来回答吧!