如何使用wholetextfiles读取spark中的gz文件

zfycwa2u  于 2021-06-03  发布在  Hadoop
关注(0)|答案(2)|浏览(482)

我有一个文件夹,其中包含许多小.gz文件(压缩csv文本文件)。我需要在我的spark工作中读取它们,但问题是我需要根据文件名中的信息进行一些处理。因此,我没有使用:

JavaRDD<<String>String> input = sc.textFile(...)

因为据我所知,我没有访问文件名这种方式。相反,我用了:

JavaPairRDD<<String>String,String> files_and_content = sc.wholeTextFiles(...);

因为这样我得到了一对文件名和内容。然而,似乎这样,输入读取器无法从gz文件中读取文本,而是读取二进制乱码。
所以,我想知道我是否可以设置它以某种方式读取文本,或者使用 sc.textFile(...)

pgky5nke

pgky5nke1#

我在使用spark连接s3时也遇到了同样的问题。
我的文件是没有扩展名的gzip csv。

JavaPairRDD<String, String> fileNameContentsRDD = javaSparkContext.wholeTextFiles(logFile);

此方法返回currupted值
我用下面的代码解决了这个问题:

JavaPairRDD<String, String> fileNameContentsRDD = javaSparkContext.wholeTextFiles(logFile+".gz");

通过在s3url中添加.gz,spark自动选择文件并像读取gz文件一样读取它(这似乎是一种错误的方法,但解决了我的问题)。

w8biq8rn

w8biq8rn2#

不能用wholetextfiles读取gzip文件,因为它使用combinefileinputformat,而不能读取gzip文件,因为它们不可拆分(源代码证明了这一点):

override def createRecordReader(
      split: InputSplit,
      context: TaskAttemptContext): RecordReader[String, String] = {

    new CombineFileRecordReader[String, String](
      split.asInstanceOf[CombineFileSplit],
      context,
      classOf[WholeTextFileRecordReader])
  }

你可以使用 newAPIHadoopFilewholefileinputformat (不是内置在hadoop中,而是通过互联网)使其正常工作。
更新1:我不认为wholefileinputformat可以工作,因为它只获取文件的字节,这意味着您可能需要编写自己的类,可能需要扩展wholefileinputformat以确保它解压缩字节。
另一个选择是使用gzipinputstream自己解压缩字节
更新2:如果你有权访问目录名,比如下面的评论,你可以得到所有这样的文件。

Path path = new Path("");
FileSystem fileSystem = path.getFileSystem(new Configuration()); //just uses the default one
FileStatus []  fileStatuses = fileSystem.listStatus(path);
ArrayList<Path> paths = new ArrayList<>();
for (FileStatus fileStatus : fileStatuses) paths.add(fileStatus.getPath());

相关问题