Hadoop中的多输出格式

jk9hmnmh  于 2022-12-11  发布在  Hadoop
关注(0)|答案(4)|浏览(223)

我是Hadoop的新手。我正在试用Wordcount程序。
现在,为了测试多个输出文件,我使用了MultipleOutputFormat。这个链接帮助我做到了这一点。http://hadoop.apache.org/common/docs/r0.19.0/api/org/apache/hadoop/mapred/lib/MultipleOutputs.html
在我的驾驶课上

MultipleOutputs.addNamedOutput(conf, "even",
            org.apache.hadoop.mapred.TextOutputFormat.class, Text.class,
            IntWritable.class);

    MultipleOutputs.addNamedOutput(conf, "odd",
            org.apache.hadoop.mapred.TextOutputFormat.class, Text.class,
            IntWritable.class);`

而我被精简班级变成了这个

public static class Reduce extends MapReduceBase implements
        Reducer<Text, IntWritable, Text, IntWritable> {
    MultipleOutputs mos = null;

    public void configure(JobConf job) {
        mos = new MultipleOutputs(job);
    }

    public void reduce(Text key, Iterator<IntWritable> values,
            OutputCollector<Text, IntWritable> output, Reporter reporter)
            throws IOException {
        int sum = 0;
        while (values.hasNext()) {
            sum += values.next().get();
        }
        if (sum % 2 == 0) {
            mos.getCollector("even", reporter).collect(key, new IntWritable(sum));
        }else {
            mos.getCollector("odd", reporter).collect(key, new IntWritable(sum));
        }
        //output.collect(key, new IntWritable(sum));
    }
    @Override
    public void close() throws IOException {
        // TODO Auto-generated method stub
    mos.close();
    }
}

事情的工作,但我得到很多文件,(一个奇数和一个偶数为每个Map减少)
问题是:我怎么能只有2个输出文件(奇数和偶数),使每一个奇数输出的每一个Map减少得到写入到奇数文件,并为偶数相同。

czq61nw1

czq61nw11#

每个Reducer都使用OutputFormat来写入记录。这就是为什么每个Reducer都有一组奇数和偶数文件的原因。这是设计好的,这样每个Reducer都可以并行执行写入操作。
如果只需要一个奇数和一个偶数文件,则需要将mapred.reduce.tasks设置为1。但是性能会受到影响,因为所有Map器都将输入到一个reducer中。
另一个选择是更改读取这些文件的进程以接受多个输入文件,或者编写一个单独的进程将这些文件合并在一起。

dvtswwa3

dvtswwa32#

我为此写了一个类。用它来做你的工作:

job.setOutputFormatClass(m_customOutputFormatClass);

这是我的课:

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

/**
 * TextOutputFormat extension which enables writing the mapper/reducer's output in multiple files.<br>
 * <p>
 * <b>WARNING</b>: The number of different folder shuoldn't be large for one mapper since we keep an
 * {@link RecordWriter} instance per folder name.
 * </p>
 * <p>
 * In this class the folder name is defined by the written entry's key.<br>
 * To change this behavior simply extend this class and override the
 * {@link HdMultipleFileOutputFormat#getFolderNameExtractor()} method and create your own
 * {@link FolderNameExtractor} implementation.
 * </p>
 * 
 * 
 * @author ykesten
 * 
 * @param <K> - Keys type
 * @param <V> - Values type
 */
public class HdMultipleFileOutputFormat<K, V> extends TextOutputFormat<K, V> {

    private String folderName;

    private class MultipleFilesRecordWriter extends RecordWriter<K, V> {

        private Map<String, RecordWriter<K, V>> fileNameToWriter;
        private FolderNameExtractor<K, V> fileNameExtractor;
        private TaskAttemptContext job;

        public MultipleFilesRecordWriter(FolderNameExtractor<K, V> fileNameExtractor, TaskAttemptContext job) {
            fileNameToWriter = new HashMap<String, RecordWriter<K, V>>();
            this.fileNameExtractor = fileNameExtractor;
            this.job = job;
        }

        @Override
        public void write(K key, V value) throws IOException, InterruptedException {
            String fileName = fileNameExtractor.extractFolderName(key, value);
            RecordWriter<K, V> writer = fileNameToWriter.get(fileName);
            if (writer == null) {
                writer = createNewWriter(fileName, fileNameToWriter, job);
                if (writer == null) {
                    throw new IOException("Unable to create writer for path: " + fileName);
                }
            }
            writer.write(key, value);
        }

        @Override
        public void close(TaskAttemptContext context) throws IOException, InterruptedException {
            for (Entry<String, RecordWriter<K, V>> entry : fileNameToWriter.entrySet()) {
                entry.getValue().close(context);
            }
        }

    }

    private synchronized RecordWriter<K, V> createNewWriter(String folderName,
            Map<String, RecordWriter<K, V>> fileNameToWriter, TaskAttemptContext job) {
        try {
            this.folderName = folderName;
            RecordWriter<K, V> writer = super.getRecordWriter(job);
            this.folderName = null;
            fileNameToWriter.put(folderName, writer);
            return writer;
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }

    @Override
    public Path getDefaultWorkFile(TaskAttemptContext context, String extension) throws IOException {
        Path path = super.getDefaultWorkFile(context, extension);
        if (folderName != null) {
            String newPath = path.getParent().toString() + "/" + folderName + "/" + path.getName();
            path = new Path(newPath);
        }
        return path;
    }

    @Override
    public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
        return new MultipleFilesRecordWriter(getFolderNameExtractor(), job);
    }

    public FolderNameExtractor<K, V> getFolderNameExtractor() {
        return new KeyFolderNameExtractor<K, V>();
    }

    public interface FolderNameExtractor<K, V> {
        public String extractFolderName(K key, V value);
    }

    private static class KeyFolderNameExtractor<K, V> implements FolderNameExtractor<K, V> {
        public String extractFolderName(K key, V value) {
            return key.toString();
        }
    }

}
raogr8fs

raogr8fs3#

将根据减速器的数量生成多个输出文件。
您可以使用hadoop dfs -getmerge来合并输出。

olqngx59

olqngx594#

您可以尝试更改输出文件名(Reducer输出),因为HDFS仅支持附加操作,那么它将从所有Reducer中收集所有Temp-r-0000 x文件(分区),并将它们放在一个文件中。
这里是您需要创建的类,它将覆盖TextOutputFormat中的方法:

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;


public class CustomNameMultipleFileOutputFormat<K, V> extends TextOutputFormat<K, V> {

    private String folderName;

    private class MultipleFilesRecordWriter extends RecordWriter<K, V> {

        private Map<String, RecordWriter<K, V>> fileNameToWriter;
        private FolderNameExtractor<K, V> fileNameExtractor;
        private TaskAttemptContext job;
        
        

        public MultipleFilesRecordWriter(FolderNameExtractor<K, V> fileNameExtractor, TaskAttemptContext job) {
            fileNameToWriter = new HashMap<String, RecordWriter<K, V>>();
            this.fileNameExtractor = fileNameExtractor;
            this.job = job;
        }

        @Override
        public void write(K key, V value) throws IOException, InterruptedException {
            String fileName = "**[FOLDER_NAME_INCLUDING_SUB_DIRS]**";//fileNameExtractor.extractFolderName(key, value);
            
            RecordWriter<K, V> writer = fileNameToWriter.get(fileName);
            if (writer == null) {
                writer = createNewWriter(fileName, fileNameToWriter, job);
                if (writer == null) {
                    throw new IOException("Unable to create writer for path: " + fileName);
                }
            }
            writer.write(key, value);
        }

        @Override
        public void close(TaskAttemptContext context) throws IOException, InterruptedException {
            for (Entry<String, RecordWriter<K, V>> entry : fileNameToWriter.entrySet()) {
                entry.getValue().close(context);
            }
        }

    

    }

    private synchronized RecordWriter<K, V> createNewWriter(String folderName,
            Map<String, RecordWriter<K, V>> fileNameToWriter, TaskAttemptContext job) {
        try {
            this.folderName = folderName;
            RecordWriter<K, V> writer = super.getRecordWriter(job);
            this.folderName = null;
            fileNameToWriter.put(folderName, writer);
            
            return writer;
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }

    @Override
    public Path getDefaultWorkFile(TaskAttemptContext context, String extension) throws IOException {
        Path path = super.getDefaultWorkFile(context, extension);
     
        if (folderName != null) {
            String newPath = path.getParent().toString() + "/" + folderName + "/**[ONE_FILE_NAME]**";
            
            path = new Path(newPath);
      
        }
        return path;
    }

    @Override
    public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
        return new MultipleFilesRecordWriter(getFolderNameExtractor(), job);
    }

    public FolderNameExtractor<K, V> getFolderNameExtractor() {
        return new KeyFolderNameExtractor<K, V>();
    }

    public interface FolderNameExtractor<K, V> {
        public String extractFolderName(K key, V value);
    }

    private static class KeyFolderNameExtractor<K, V> implements FolderNameExtractor<K, V> {
        public String extractFolderName(K key, V value) {
            return key.toString();
        }
    }

}

然后选择还原器/Map器:

public static class ExtraLabReducer extends Reducer<CustomKeyComparable, Text, CustomKeyComparable, Text>
{
    MultipleOutputs multipleOutputs;

    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        multipleOutputs = new MultipleOutputs(context);
    }

    @Override
    public void reduce(CustomKeyComparable key, Iterable<Text> values, Context context) throws IOException, InterruptedException
    {
        for(Text d : values)
        {
             **multipleOutputs.write**("batta",key, d,**"[EXAMPLE_FILE_NAME]"**);
        }
        
    }
    
    @Override
    protected void cleanup(Context context) throws IOException, InterruptedException {
        multipleOutputs.close();
    }
    
}

然后在“作业配置:

Job job = new Job(getConf(), "ExtraLab");
    job.setJarByClass(ExtraLab.class);

    job.setMapperClass(ExtraLabMapper.class);
    job.setReducerClass(ExtraLabReducer.class);

    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(DoubleWritable.class);
    
    job.setMapOutputKeyClass(CustomKeyComparable.class);
    job.setMapOutputValueClass(Text.class);

    job.setInputFormatClass(TextInputFormat.class);
    //job.setOutputFormatClass(TextOutputFormat.class);

    
    FileInputFormat.addInputPath(job, new Path(args[0]));
    //adding one more reducer
    job.setNumReduceTasks(2);
    
    LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);

    MultipleOutputs.addNamedOutput(job,"batta", CustomNameMultipleFileOutputFormat.class,CustomKeyComparable.class,Text.class);

相关问题