untar文件包含多个不相关的csv文件,使用hadoop编程

agyaoht7  于 2021-05-16  发布在  Spark
关注(0)|答案(2)|浏览(371)

我的hdfs中有几个压缩文件(.tar.gz),其中包含不相关的tsv文件(类似于下面的列表)。我想以编程方式解压这些文件夹,可能利用mpp架构(例如hadoop或spark)并将它们保存到hdfs中。

- browser.tsv
- connection_type.tsv
- country.tsv
- color_depth.tsv
- javascript_version.tsv
- languages.tsv
- operating_systems.tsv
- plugins.tsv
- referrer_type.tsv
- resolution.tsv
- search_engine.tsv

到目前为止,我只能想出一个bash脚本,从hdfs下载每个文件,untars并将文件夹保存回hdfs。我甚至可以并行脚本,但我也不满意的解决方案。
谢谢:)
编辑:
很有意思的是,我们可以通过以下任何一种方法来解决问题:
Spark2.4.5
Hive2.3.6
清管器0.17.0
hadoop 2.8.5版本

kcrjzv8t

kcrjzv8t1#

我能看到的唯一方法是迭代每个文件,然后用spark读取,然后用spark本身将其写回hdfs,不压缩。类似这样的东西(使用pyspark):

for p in paths
    df = spark.read.csv(p, sep=r'\t', header=True)
    df.write.csv(p, sep=r'\t', header=True)

注意:我还没有测试过这段代码,在hdfs和tar文件中复制它很复杂,可能需要添加一些额外的参数来解析tar文件,但我希望这个想法很清楚。
imho不可能在一次迭代中同时读取所有这些文件,因为它们具有不同的结构(以及它们表示的不同数据)。

jw5wzhpr

jw5wzhpr2#

我终于找到了一个解决我的问题的方法,它由一个只支持mapper的hadoop作业组成。每个Map程序在tar文件夹中获取一个未压缩的文件,并使用 MultipleOutput 来自hadoop的实用程序。
此外,我实现了一个自定义的不可拆分hadoop输入格式来处理tarball提取,称为 TarballInputFormat .

public class TarballInputFormat extends FileInputFormat<Text, Text> {

    @Override
    protected boolean isSplitable(JobContext context, Path filename) {
        return false;
    }

    @Override
    public RecordReader<Text, Text> createRecordReader(InputSplit inputSplit,
                                                       TaskAttemptContext taskAttemptContext) {
        TarballRecordReader recordReader = new TarballRecordReader();
        recordReader.initialize(inputSplit, taskAttemptContext);
        return recordReader;
    }

}

tarballrecordreader处理原始tarball文件中所有文件的提取。

public class TarballRecordReader extends RecordReader<Text, Text> {

    private static final Log log = LogFactory.getLog(TarballRecordReader.class);

    private TarInputStream tarInputStream;
    private Text key;
    private Text value;
    private boolean finished = false;
    private String folderName;

    @Override
    public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) {
        key = new Text();
        value = new Text();

        try {
            FileSplit split = (FileSplit) inputSplit;
            Configuration conf = taskAttemptContext.getConfiguration();

            Path tarballPath = split.getPath();
            folderName = tarballPath.getName().split("\\.")[0];
            FileSystem fs = tarballPath.getFileSystem(conf);
            FSDataInputStream fsInputStream = fs.open(tarballPath);

            CompressionCodecFactory compressionCodecs = new CompressionCodecFactory(conf);
            CompressionCodec codec = compressionCodecs.getCodec(tarballPath);

            tarInputStream = new TarInputStream(codec.createInputStream(fsInputStream));
        }
        catch (IOException ex) {
            log.error(ex.getMessage());
        }
    }

    @Override
    public boolean nextKeyValue() throws IOException {

        TarEntry tarEntry = tarInputStream.getNextEntry();
        while (tarEntry != null && tarEntry.isDirectory())
            tarEntry = tarInputStream.getNextEntry();

        finished = tarEntry == null;
        if (finished) {
            return false;
        }

        key.clear();
        value.clear();

        long tarSize = tarEntry.getSize();

        int read;
        int offset = 0;
        int bufSize = (int) tarSize;
        byte[] buffer = new byte[bufSize];
        while ((read = tarInputStream.read(buffer, offset, bufSize)) != -1) offset += read;

        value.set(buffer);
        key.set(folderName + "/" + tarEntry.getName());

        return true;
    }

    @Override
    public Text getCurrentKey() {
        return key;
    }

    @Override
    public Text getCurrentValue() {
        return value;
    }

    @Override
    public float getProgress() {
        return finished? 1: 0;
    }

    @Override
    public void close() throws IOException {
        if (tarInputStream != null) {
            tarInputStream.close();
        }
    }
}

每个tarball都将被提取出来,通过将每个文件相对于其父文件夹写入来保持原始结构。在这个解决方案中,我们使用Map器同时读取和写入提取的文件。这显然性能较差,但对于那些需要将提取的文件保存为原始格式(有序输出)的人来说,这可能是一个很好的折衷方案。另一种方法可以利用reducer将每个提取的文件行写入文件系统,这将以一致性(无序文件内容)为代价增加写入吞吐量。

public class ExtractTarball extends Configured implements Tool {

    public static final Log log = LogFactory.getLog(ExtractTarball.class);
    private static final String LOOKUP_OUTPUT = "lookup";

    public static class MapClass extends Mapper<Text, Text, Text, Text> {

        private MultipleOutputs<Text, Text> mos;

        @Override
        protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
            String filename = key.toString();

            int length = value.getBytes().length;
            System.out.printf("%s: %s%n", filename, length);

            mos.write(LOOKUP_OUTPUT, "", value, key.toString());
        }

        public void setup(Context context) {
            mos = new MultipleOutputs<>(context);
        }

        protected void cleanup(Context context) throws IOException, InterruptedException {
            mos.close();
        }
    }

    public int run(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "ExtractTarball");
        job.setJarByClass(this.getClass());
        job.setMapperClass(MapClass.class);

        job.setInputFormatClass(TarballInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        job.setNumReduceTasks(0);

        MultipleOutputs.addNamedOutput(job, LOOKUP_OUTPUT, TextOutputFormat.class, Text.class, Text.class);

        log.isDebugEnabled();
        return job.waitForCompletion(true) ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run(new ExtractTarball(), args);
        System.out.println(exitCode);
        System.exit(exitCode);
    }
}

输出文件夹如下所示:

- output
    - lookup_data
        - .browser.tsv-m-00000.crc
        - .browser_type.tsv-m-00000.crc
        - .color_depth.tsv-m-00000.crc
        - .column_headers.tsv-m-00000.crc
        - .connection_type.tsv-m-00000.crc
        - .country.tsv-m-00000.crc
        - .event.tsv-m-00000.crc
        - .javascript_version.tsv-m-00000.crc
        - .languages.tsv-m-00000.crc
        - .operating_systems.tsv-m-00000.crc
        - .plugins.tsv-m-00000.crc
        - .referrer_type.tsv-m-00000.crc
        - .resolution.tsv-m-00000.crc
        - .search_engines.tsv-m-00000.crc
        - browser.tsv-m-00000
        - browser_type.tsv-m-00000
        - color_depth.tsv-m-00000
        - column_headers.tsv-m-00000
        - connection_type.tsv-m-00000
        - country.tsv-m-00000
        - event.tsv-m-00000
        - javascript_version.tsv-m-00000
        - languages.tsv-m-00000
        - operating_systems.tsv-m-00000
        - plugins.tsv-m-00000
        - referrer_type.tsv-m-00000
        - resolution.tsv-m-00000
        - search_engines.tsv-m-00000
- ._SUCCESS.crc
- .part-m-00000.crc
- _SUCCESS
- part-m-00000

相关问题