从reducer输出中删除尾部选项卡

u3r8eeie  于 2021-06-03  发布在  Hadoop
关注(0)|答案(3)|浏览(250)

我有一个hadoop流作业,它的输出不包含键/值对。您可以将其视为仅值对或仅键对。
我的streaming reducer(一个php脚本)正在输出以换行符分隔的记录。hadoop流将其视为没有值的键,并在换行符之前插入一个选项卡。这个额外的标签是不需要的。
如何删除它?
我正在使用hadoop1.0.3和aws emr。我下载了hadoop 1.0.3的源代码,在hadoop-1.0.3/src/contrib/streaming/src/java/org/apache/hadoop/streaming/pipereducer.java中找到了以下代码:

reduceOutFieldSeparator = job_.get("stream.reduce.output.field.separator", "\t").getBytes("UTF-8");

所以我试着传球 -D stream.reduce.output.field.separator= 作为对这份工作毫无运气的争论。我也试过了 -D mapred.textoutputformat.separator= 以及 -D mapreduce.output.textoutputformat.separator= 没有运气。
当然,我在谷歌上搜索了一下,没有发现任何有用的东西。一个搜索结果甚至指出,没有任何参数可以通过传递来获得所需的结果(不过,这种情况下的hadoop版本确实很旧)。
以下是我的代码(为了便于阅读,添加了换行符):

hadoop jar streaming.jar -files s3n://path/to/a/file.json#file.json
    -D mapred.output.compress=true -D stream.reduce.output.field.separator=
    -input s3n://path/to/some/input/*/* -output hdfs:///path/to/output/dir
    -mapper 'php my_mapper.php' -reducer 'php my_reducer.php'
vzgqcmou

vzgqcmou1#

作为对其他人的帮助,使用上述技巧,我能够实现:

CustomOutputFormat<K, V> extends org.apache.hadoop.mapred.TextOutputFormat<K, V> {....}

“getrecordwriter”的内置实现只有一行更改为:

String keyValueSeparator = job.get("mapred.textoutputformat.separator", "");

而不是:

String keyValueSeparator = job.get("mapred.textoutputformat.separator", "\t");

将其编译到jar中,并将其包含到我的hadoop流调用中(通过hadoop流的说明),调用如下所示:

hadoop   jar  /usr/lib/hadoop/contrib/streaming/hadoop-streaming-1.0.3.jar     \
-archives 'hdfs:///user/the/path/to/your/jar/onHDFS/theNameOfTheJar.jar' \
-libjars theNameOfTheJar.jar \
-outputformat com.yourcompanyHere.package.path.tojavafile.CustomOutputFormat  \
-file yourMapper.py    -mapper  yourMapper.py     \
-file yourReducer.py   -reducer yourReducer.py    \
-input $yourInputFile    \
-output $yourOutputDirectoryOnHDFS

我还将jar包含在发出调用的文件夹中。
它非常适合我的需要(而且在减速机后面的行末尾没有创建选项卡)。
更新:根据一条暗示这确实对其他人有帮助的评论,下面是我的customoutputformat.java文件的完整源代码:

import java.io.DataOutputStream;
import java.io.IOException;

import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordWriter;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ReflectionUtils;

public class CustomOutputFormat<K, V> extends TextOutputFormat<K, V> {

    public RecordWriter<K, V> getRecordWriter(FileSystem ignored, JobConf job, String name,
        Progressable progress) throws IOException {
    boolean isCompressed = getCompressOutput(job);

    //Channging the default from '\t' to blank
    String keyValueSeparator = job.get("mapred.textoutputformat.separator", ""); // '\t'
    if (!isCompressed) {
        Path file = FileOutputFormat.getTaskOutputPath(job, name);
        FileSystem fs = file.getFileSystem(job);
        FSDataOutputStream fileOut = fs.create(file, progress);
        return new LineRecordWriter<K, V>(fileOut, keyValueSeparator);
    } else {
        Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job,
            GzipCodec.class);
        // create the named codec
        CompressionCodec codec = ReflectionUtils.newInstance(codecClass, job);
        // build the filename including the extension
        Path file = FileOutputFormat.getTaskOutputPath(job, name + codec.getDefaultExtension());
        FileSystem fs = file.getFileSystem(job);
        FSDataOutputStream fileOut = fs.create(file, progress);
        return new LineRecordWriter<K, V>(new DataOutputStream(
            codec.createOutputStream(fileOut)), keyValueSeparator);
    }
    }
}

仅供参考:对于您的使用上下文,请确保检查这不会对Map器和reducer之间的hadoop流式管理交互(在区分键和值方面)产生不利影响。澄清:
根据我的测试——如果在数据的每一行中都有一个“tab”(每边都有一些内容),那么可以保留内置的默认值:流式处理将第一个tab前面的第一个内容解释为“键”,然后该行后面的所有内容都解释为“值”。因此,流式处理不会看到“null值,'并且不会附加一个显示在您的减速机后面的选项卡(您将看到最终输出按“key”的值排序,流式处理将每行中的“key”解释为在每个选项卡之前发生的事情。)
相反,如果您的数据中没有选项卡,并且您没有使用上述技巧覆盖默认值,那么您将在运行完成后看到这些选项卡,对于这些选项卡,上述覆盖将成为修复。

dfty9e19

dfty9e192#

我也有这个问题。我使用的是一个python,纯Map的工作,基本上只是发送csv数据行。检查完输出后,我注意到每行末尾的\t。

foo,bar,baz\t

我发现Map器和python流都在处理键值对。如果不发出默认分隔符,则整行csv数据将被视为“键”,而需要键和值的框架将\t和空值加在一起。
因为我的数据本质上是一个csv字符串,所以我将stream和mapred输出的分隔符都设置为逗号。框架将第一个逗号之前的所有内容作为键读取,第一个逗号之后的所有内容作为值读取。然后,当它把结果写到文件中时,它写下了关键的逗号值,这有效地创建了我想要的输出。

foo,bar,baz

在我的例子中,我添加了以下内容以防止框架将\t添加到csv输出的末尾。。。

-D mapred.reduce.tasks=0 \
-D stream.map.output.field.separator=, \
-D mapred.textoutputformat.separator=, \
jgovgodb

jgovgodb3#

查看org.apache.hadoop.mapreduce.lib.output.textoutputformat源代码,我看到两件事:
这个 write(key,value) 方法在键或值不为null时写入分隔符
始终使用默认值设置分隔符( \t ),当 mapred.textoutputformat.separator 返回null(我假设发生在 -D stream.reduce.output.field.separator= 您唯一的解决方案可能是编写自己的outputformat来解决这两个问题。

我的测试

在我的一项任务中,我想重新格式化

id1|val1|val2|val3
id1|val1

分为:

id1|val1,val2,val3
id2|val1

我有一个自定义Map器(perl脚本)来转换这些行。对于这个任务,我最初尝试只作为键(或者只作为值)输入,但是得到了后面的标签的结果。
一开始我只是说:
-d stream.map.input.field.separator='|'-d stream.map.output.field.separator='|'
这给了Map器一个键,值对,因为我的Map无论如何都需要一个键。但是这个输出现在在第一个字段后面有一个标签
当我添加以下内容时,得到了所需的输出:
-d mapred.textoutputformat.separator='|'
如果我没有设置它或设置为空白
-d mapred.textoutputformat.separator分隔符=
然后我会在第一个字段之后再次得到一个标签。
当我查看textoutputformat的源代码时,这是有意义的

相关问题