hadoop项目-寻找某一天消费量最大的客户

pdsfdshx  于 2021-05-29  发布在  Hadoop
关注(0)|答案(0)|浏览(124)

我正在用hadoop创建一个项目。我不太了解hadoop和mapreducing过程,所以我的问题是:我有一个三列的大数据集;客户id、时间戳(yyyy-mm-dd hh:mm:ss)和消费。我想找到客户,在某一天的最大消费量,我会选择。
这是我的密码:
我的司机

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class alicanteDriver {

    public static void main(String[] args) throws Exception {
        long t_start = System.currentTimeMillis();
        long t_end;

        Configuration conf = new Configuration();//JobConf(WordCountDriver.class);//
        Job job = Job.getInstance(conf, "WorCount");
        job.setJarByClass(alicanteDriver.class);
        //job.setMapperClass(alicanteMapper.class);
        job.setMapperClass(alicanteMapperC.class);        

        //job.setCombinerClass(alicanteCombiner.class);

        job.setPartitionerClass(alicantePartitioner.class);
        //job.setPartitionerClass(alicantePartitionerC.class);

        job.setNumReduceTasks(8);
        job.setReducerClass(alicanteReducer.class);
        //job.setReducerClass(alicanteReducerC.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        FileInputFormat.addInputPath(job, new Path("/alicante_1y.txt"));
        FileOutputFormat.setOutputPath(job, new Path("/alicante_output"));
        job.waitForCompletion(true);
        t_end = System.currentTimeMillis();

        System.out.println((t_end-t_start)/1000);
    }
 }

我的Map器

import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.StringTokenizer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class alicanteMapperC extends
        Mapper<LongWritable, Text, Text, IntWritable> {

    String Customer = new String();
    SimpleDateFormat ft = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    Date t = new Date();
    Date d2 = new Date();
    IntWritable Consumption = new IntWritable();
    int counter = 0;

    // temp variables
    int max = 0;
    String cust = new String();

    @Override
    public void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {

        try {
            d2 = ft.parse("2013-07-01 01:00:00");
        } catch (ParseException e1) {
            // TODO Auto-generated catch block
            e1.printStackTrace();
        }

        // find max
        if (counter > 0) {
            String line = value.toString();
            StringTokenizer itr = new StringTokenizer(line, ",");

            while (itr.hasMoreTokens()) {
                Customer = itr.nextToken();
                Consumption.set(Integer.parseInt(itr.nextToken()));
                try {
                    t = ft.parse(itr.nextToken());
                    if (t.compareTo(d2) == 0) {
                        max = Math.max(max, Consumption.get());
                    }
                } catch (ParseException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        }
        counter++;
    }
}

我的减速机

import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class alicanteReducer extends Reducer<Text, Text, Text, IntWritable> {   
    int Consumption;
    String Customer;
    int max;
    IntWritable sum_consumption = new IntWritable();
    IntWritable ca = new IntWritable();

      public void reduce(Text key, Iterable<IntWritable> values, Context context)
          throws IOException, InterruptedException {

          //int sum = 0;      
          for (IntWritable val:values){ 
              max = val.get();
              Consumption = val.get();
              Customer = val.toString();
              if(Consumption == max){
                  context.write(new Text(Customer), ca);
              }
              //sum += Consumption;
          }
          //sum_consumption.set(sum);
          //context.write(new Text(Integer.toString(max)), sum_consumption); 
      }
    }

如你所见,我能找到某一天的最大消耗量。我无法做到的是,将客户与这一天的最大消费量相关联,并将其ID写入输出文件。
在mapper类中,我尝试运行了一秒钟,使用max来查找客户,但是时间太长了。
你对课程的结构有什么看法吗?提前谢谢。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题