在hadoopMap任务中,“开始刷新Map输出”需要很长时间

zbq4xfa0  于 2021-06-03  发布在  Hadoop
关注(0)|答案(1)|浏览(359)

我在一个小文件(3-4MB)上执行map任务,但map输出相对较大(150MB)。100%显示Map后,需要很长时间才能完成泄漏。请建议我如何缩短这段时间。下面是一些示例日志。。。

13/07/10 17:45:31 INFO mapred.MapTask: Starting flush of map output
13/07/10 17:45:32 INFO mapred.JobClient:  map 98% reduce 0%
13/07/10 17:45:34 INFO mapred.LocalJobRunner: 
13/07/10 17:45:35 INFO mapred.JobClient:  map 100% reduce 0%
13/07/10 17:45:37 INFO mapred.LocalJobRunner: 
13/07/10 17:45:40 INFO mapred.LocalJobRunner: 
13/07/10 17:45:43 INFO mapred.LocalJobRunner: 
13/07/10 17:45:46 INFO mapred.LocalJobRunner: 
13/07/10 17:45:49 INFO mapred.LocalJobRunner: 
13/07/10 17:45:52 INFO mapred.LocalJobRunner: 
13/07/10 17:45:55 INFO mapred.LocalJobRunner: 
13/07/10 17:45:58 INFO mapred.LocalJobRunner: 
13/07/10 17:46:01 INFO mapred.LocalJobRunner: 
13/07/10 17:46:04 INFO mapred.LocalJobRunner: 
13/07/10 17:46:07 INFO mapred.LocalJobRunner: 
13/07/10 17:46:10 INFO mapred.LocalJobRunner: 
13/07/10 17:46:13 INFO mapred.LocalJobRunner: 
13/07/10 17:46:16 INFO mapred.LocalJobRunner: 
13/07/10 17:46:19 INFO mapred.LocalJobRunner: 
13/07/10 17:46:22 INFO mapred.LocalJobRunner: 
13/07/10 17:46:25 INFO mapred.LocalJobRunner: 
13/07/10 17:46:28 INFO mapred.LocalJobRunner: 
13/07/10 17:46:31 INFO mapred.LocalJobRunner: 
13/07/10 17:46:34 INFO mapred.LocalJobRunner: 
13/07/10 17:46:37 INFO mapred.LocalJobRunner: 
13/07/10 17:46:40 INFO mapred.LocalJobRunner: 
13/07/10 17:46:43 INFO mapred.LocalJobRunner: 
13/07/10 17:46:46 INFO mapred.LocalJobRunner: 
13/07/10 17:46:49 INFO mapred.LocalJobRunner: 
13/07/10 17:46:52 INFO mapred.LocalJobRunner: 
13/07/10 17:46:55 INFO mapred.LocalJobRunner: 
13/07/10 17:46:58 INFO mapred.LocalJobRunner: 
13/07/10 17:47:01 INFO mapred.LocalJobRunner: 
13/07/10 17:47:04 INFO mapred.LocalJobRunner: 
13/07/10 17:47:07 INFO mapred.LocalJobRunner: 
13/07/10 17:47:10 INFO mapred.LocalJobRunner: 
13/07/10 17:47:13 INFO mapred.LocalJobRunner: 
13/07/10 17:47:16 INFO mapred.LocalJobRunner: 
13/07/10 17:47:19 INFO mapred.LocalJobRunner: 
13/07/10 17:47:22 INFO mapred.LocalJobRunner: 
13/07/10 17:47:25 INFO mapred.LocalJobRunner: 
13/07/10 17:47:28 INFO mapred.LocalJobRunner: 
13/07/10 17:47:31 INFO mapred.LocalJobRunner: 
13/07/10 17:47:34 INFO mapred.LocalJobRunner: 
13/07/10 17:47:37 INFO mapred.LocalJobRunner: 
13/07/10 17:47:40 INFO mapred.LocalJobRunner: 
13/07/10 17:47:43 INFO mapred.LocalJobRunner: 
13/07/10 17:47:45 INFO mapred.MapTask: Finished spill 0
13/07/10 17:47:45 INFO mapred.Task: Task:attempt_local_0003_m_000000_0 is done. And is in the process of commiting
13/07/10 17:47:45 INFO mapred.LocalJobRunner: 
13/07/10 17:47:45 INFO mapred.Task: Task 'attempt_local_0003_m_000000_0' done.
...............................
...............................
...............................
13/07/10 17:47:52 INFO mapred.JobClient: Counters: 22
13/07/10 17:47:52 INFO mapred.JobClient:   File Output Format Counters 
13/07/10 17:47:52 INFO mapred.JobClient:     Bytes Written=13401245
13/07/10 17:47:52 INFO mapred.JobClient:   FileSystemCounters
13/07/10 17:47:52 INFO mapred.JobClient:     FILE_BYTES_READ=18871098
13/07/10 17:47:52 INFO mapred.JobClient:     HDFS_BYTES_READ=7346566
13/07/10 17:47:52 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=35878426
13/07/10 17:47:52 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=18621307
13/07/10 17:47:52 INFO mapred.JobClient:   File Input Format Counters 
13/07/10 17:47:52 INFO mapred.JobClient:     Bytes Read=2558288
13/07/10 17:47:52 INFO mapred.JobClient:   Map-Reduce Framework
13/07/10 17:47:52 INFO mapred.JobClient:     Reduce input groups=740000
13/07/10 17:47:52 INFO mapred.JobClient:     Map output materialized bytes=13320006
13/07/10 17:47:52 INFO mapred.JobClient:     Combine output records=740000
13/07/10 17:47:52 INFO mapred.JobClient:     Map input records=71040
13/07/10 17:47:52 INFO mapred.JobClient:     Reduce shuffle bytes=0
13/07/10 17:47:52 INFO mapred.JobClient:     Physical memory (bytes) snapshot=0
13/07/10 17:47:52 INFO mapred.JobClient:     Reduce output records=740000
13/07/10 17:47:52 INFO mapred.JobClient:     Spilled Records=1480000
13/07/10 17:47:52 INFO mapred.JobClient:     Map output bytes=119998400
13/07/10 17:47:52 INFO mapred.JobClient:     CPU time spent (ms)=0
13/07/10 17:47:52 INFO mapred.JobClient:     Total committed heap usage (bytes)=1178009600
13/07/10 17:47:52 INFO mapred.JobClient:     Virtual memory (bytes) snapshot=0
13/07/10 17:47:52 INFO mapred.JobClient:     Combine input records=7499900
13/07/10 17:47:52 INFO mapred.JobClient:     Map output records=7499900
13/07/10 17:47:52 INFO mapred.JobClient:     SPLIT_RAW_BYTES=122
13/07/10 17:47:52 INFO mapred.JobClient:     Reduce input records=740000

Map任务源代码:

public class GsMR2MapThree extends Mapper<Text, Text, LongWritable,DoubleWritable>{

    private DoubleWritable distGexpr = new DoubleWritable();
    private LongWritable m2keyOut = new LongWritable();
    int trMax,tstMax;

    protected void setup(Context context) throws java.io.IOException, java.lang.InterruptedException {

        Configuration conf =context.getConfiguration();
        tstMax = conf.getInt("mtst", 10);
        trMax = conf.getInt("mtr", 10);

    }

    public void map(Text key, Text values, Context context) throws IOException, InterruptedException {
        String line = values.toString();

        double Tij=0.0,TRij=0.0, dist=0;
        int i=0,j;
        long m2key=0;
        String[] SLl = new String[]{};

        Configuration conf =context.getConfiguration();

        m2key = Long.parseLong(key.toString());
        StringTokenizer tokenizer = new StringTokenizer(line);
        j=0;
        while (tokenizer.hasMoreTokens()) {

            String test = tokenizer.nextToken();
            if(j==0){
                Tij = Double.parseDouble(test);
            }
            else if(j==1){
                TRij = Double.parseDouble(test);
            }
            else if(j==2){
                SLl = StringUtils.split(conf.get(test),",");
            }
            j++;
        }
        //Map input ends

        //Distance Measure function
        dist = (long)Math.pow( (Tij - TRij), 2);

        //remove gid from key 
        m2key = m2key / 100000;
        //Map2 <key,value> emit starts
        for(i=0; i<SLl.length;i++){
               long m2keyNew = (Integer.parseInt(SLl[i])*(trMax*tstMax))+m2key;
            m2keyOut.set(m2keyNew);
            distGexpr.set(dist);
            context.write(m2keyOut,distGexpr);
        }
        //<key,value> emit done
    }

}

示例Map输入:每行中的最后一个变量从广播变量中获取整数数组。每行将产生大约100-200条输出记录。

10100014    1356.3238 1181.63 gs-4-56
10100026    3263.1167 3192.4131 gs-3-21
10100043    1852.0 1926.3962 gs-4-76
10100062    1175.5925 983.47125 gs-3-19
10100066    606.59125 976.26625 gs-8-23

示例Map输出:

10101   8633.0
10102   1822.0
10103   13832.0
10104   2726470.0
10105   1172991.0
10107   239367.0
10109   5410384.0
10111   7698352.0
10112   6.417
wmomyfyw

wmomyfyw1#

我想你已经解决了这个问题(在发布原始消息两年后),但是对于任何一个遇到同样问题的人,我将尝试提供一些建议。
从您的计数器判断,我知道您已经使用了压缩(因为Map输出物化字节的数量与Map输出字节的数量不同),这是一件好事。通过使用可变长度编码的vlongwriteable类作为Map输出键类型,可以进一步压缩Map器的输出(如果我没弄错的话,以前也有一个vdoublewritable类,但现在它肯定已经被弃用了)。
在发出输出的for循环中,不需要设置 distGexpr 每次都是可变的。它总是一样的,所以在for循环之前设置它。你也可以储存长期与产品的 trMax*tstMax 而不是每次迭代都计算它。
如果可能,使您的输入键可长写(从上一个作业),以便您可以保存 Long.parseLong() 以及 Text.toString() 调用。
如果可能的话(取决于你的减速机),使用一个合并器来减少溢出字节的大小。
我找不到办法跳过这个 Integer.parseInt() 在for循环中调用,但是如果可以首先加载 SLl 作为 int[] .

相关问题