为什么输出文件中没有mapreduce矩阵和向量乘法?

xlpyo6sf  于 2021-07-13  发布在  Hadoop
关注(0)|答案(0)|浏览(263)

我想用mapreduce把一个1000x1000的矩阵和一个1000x1的向量相乘。显示为图片的输入文件由随机的单个数字变量组成,并用空格分隔。请在此处输入图像描述
我的代码如下:

import java.io.IOException;
import java.util.*;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

public class Matrix_Vector {

    public static class MatrixMapper extends
            Mapper<LongWritable, Text, Text, Text> {
        private String flag = null;// name of the data set
        private int rowNum = 1000;// row of matrix A
        private int colNum = 1;//cols of matrix B (for vector is one)
        private int rowIndexA = 1; // row index of matrix A
        private int rowIndexB = 1; // row index of matrix B

        @Override
        protected void setup(Context context) throws IOException,
                InterruptedException {
            flag = ((FileSplit) context.getInputSplit()).getPath().getName();// Get the file name
        }

        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            String[] tokens = value.toString().split(" ");
            if ("matrix".equals(flag)) {
                for (int i = 1; i <= colNum; i++) {
                    Text k = new Text(rowIndexA + "," + i);
                    for (int j = 0; j < tokens.length; j++) {
                        Text v = new Text("a," + (j + 1) + "," + tokens[j]);
                        context.write(k, v);
                    }
                }
                rowIndexA++;// move to next line
            } else if ("vector".equals(flag)) {
                for (int i = 1; i <= rowNum; i++) {
                    for (int j = 0; j < tokens.length; j++) {
                        Text k = new Text(i + "," + (j + 1));
                        Text v = new Text("b," + rowIndexB + "," + tokens[j]);
                        context.write(k, v);
                    }
                }
                rowIndexB++;// move to next line
            }
        }
    }

    public static class MatrixReducer extends
            Reducer<Text, Text, Text, IntWritable> {
        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context)
                throws IOException, InterruptedException {
            Map<String, String> mapA = new HashMap<String, String>();
            Map<String, String> mapB = new HashMap<String, String>();

            for (Text value : values) {
                String[] val = value.toString().split(",");
                if ("a".equals(val[0])) {
                    mapA.put(val[1], val[2]);
                } else if ("b".equals(val[0])) {
                    mapB.put(val[1], val[2]);
                }
            }

            int result = 0;
            Iterator<String> mKeys = mapA.keySet().iterator();
            while (mKeys.hasNext()) {
                String mkey = mKeys.next();
                if (mapB.get(mkey) == null) {
                    continue;
                }
                result += Integer.parseInt(mapA.get(mkey))
                        * Integer.parseInt(mapB.get(mkey));
            }
            context.write(key, new IntWritable(result));
        }
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        //job start running
        long main_start=System.currentTimeMillis();

        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        if (otherArgs.length != 2) 
        {
          System.err.println("Usage: MatrixVector <in> <out>");
          System.exit(2);
        }

        Job job = Job.getInstance(conf, "Matrix_Vector");
        job.setJarByClass(Matrix_Vector.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);

        job.setMapperClass(MatrixMapper.class);
        job.setReducerClass(MatrixReducer.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        if (job.waitForCompletion(true)) { 
               long main_end=System.currentTimeMillis();      
               System.out.println("The process is end");
               System.out.println("Time of running is "+ " :  "+(main_end - main_start) +"  ms");
               System.exit(0);
           }else {
               System.out.println("The process ended with false");
           }

    }
}

这是我得到的结果,在我的输出路径中有一个成功文件和一个输出文件,但是在输出文件中没有任何内容,也可以从“bytes writed=0”中看到。我找不到我的代码逻辑哪里出错了,很抱歉结果显示为代码格式,我仍然在学习如何做更好的职位。

21/03/26 08:05:08 INFO mapreduce.Job: Job job_1616694898641_0007 completed successfully
21/03/26 08:05:10 INFO mapreduce.Job: Counters: 49
        File System Counters
                FILE: Number of bytes read=6
                FILE: Number of bytes written=606314
                FILE: Number of read operations=0
                FILE: Number of large read operations=0
                FILE: Number of write operations=0
                HDFS: Number of bytes read=2002216
                HDFS: Number of bytes written=0
                HDFS: Number of read operations=9
                HDFS: Number of large read operations=0
                HDFS: Number of write operations=2
        Job Counters
                Launched map tasks=2
                Launched reduce tasks=1
                Data-local map tasks=2
                Total time spent by all maps in occupied slots (ms)=102529
                Total time spent by all reduces in occupied slots (ms)=13848
                Total time spent by all map tasks (ms)=102529
                Total time spent by all reduce tasks (ms)=13848
                Total vcore-milliseconds taken by all map tasks=102529
                Total vcore-milliseconds taken by all reduce tasks=13848
                Total megabyte-milliseconds taken by all map tasks=104989696
                Total megabyte-milliseconds taken by all reduce tasks=14180352
        Map-Reduce Framework
                Map input records=2000
                Map output records=0
                Map output bytes=0
                Map output materialized bytes=12
                Input split bytes=212
                Combine input records=0
                Combine output records=0
                Reduce input groups=0
                Reduce shuffle bytes=12
                Reduce input records=0
                Reduce output records=0
                Spilled Records=0
                Shuffled Maps =2
                Failed Shuffles=0
                Merged Map outputs=2
                GC time elapsed (ms)=658
                CPU time spent (ms)=4140
                Physical memory (bytes) snapshot=519413760
                Virtual memory (bytes) snapshot=6241820672
                Total committed heap usage (bytes)=263872512
        Shuffle Errors
                BAD_ID=0
                CONNECTION=0
                IO_ERROR=0
                WRONG_LENGTH=0
                WRONG_MAP=0
                WRONG_REDUCE=0
        File Input Format Counters
                Bytes Read=2002004
        File Output Format Counters
                Bytes Written=0
The process is end
Time of running is  :  182011  ms

任何帮助都将不胜感激!
谢谢您,

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题