使用spark scala解压缩(解压缩/提取)util

vmpqdwk3  于 2021-05-29  发布在  Spark
关注(0)|答案(2)|浏览(747)

我在hdfs中有customer_input_data.tar.gz,它有10个不同的csv文件格式的表数据。所以我需要使用sparkscala将这个文件解压到/my/output/path
请建议如何使用spark scala解压customer_input_data.tar.gz文件

zzzyeukh

zzzyeukh1#

我开发了下面的代码来使用scala解压文件。您需要传递输入路径和输出路径以及hadoop文件系统

/*below method used for processing zip files*/
  @throws[IOException]
  private def processTargz(fullpath: String, houtPath: String, fs: FileSystem): Unit = {
    val path = new Path(fullpath)
    val gzipIn = new GzipCompressorInputStream(fs.open(path))
    try {
      val tarIn = new TarArchiveInputStream(gzipIn)
      try {
        var entry:TarArchiveEntry = null
        out.println("Tar entry")
        out.println("Tar Name entry :" + FilenameUtils.getName(fullpath))
        val fileName1 = FilenameUtils.getName(fullpath)
        val tarNamesFolder = fileName1.substring(0, fileName1.indexOf('.'))
        out.println("Folder Name : " + tarNamesFolder)
        while ( {
          (entry = tarIn.getNextEntry.asInstanceOf[TarArchiveEntry]) != null
        }) { // entity Name as tsv file name which are part of inside compressed tar file
          out.println("ENTITY NAME : " + entry.getName)

          /**If the entry is a directory, create the directory.**/
          out.println("While")
          if (entry.isDirectory) {
            val f = new File(entry.getName)
            val created = f.mkdir
            out.println("mkdir")
            if (!created) {
              out.printf("Unable to create directory '%s', during extraction of archive contents.%n", f.getAbsolutePath)
              out.println("Absolute path")
            }
          }
          else {
            var count = 0
            val slash = "/"
            val targetPath = houtPath + slash + tarNamesFolder + slash + entry.getName
            val hdfswritepath = new Path(targetPath)
            val fos = fs.create(hdfswritepath, true)
            try {
              val dest = new BufferedOutputStream(fos, BUFFER_SIZE)
              try {
                val data = new Array[Byte](BUFFER_SIZE)
                while ( {
                  (count = tarIn.read(data, 0, BUFFER_SIZE)) != -1
                }) dest.write(data, 0, count)
              } finally if (dest != null) dest.close()
            }
          }
        }
        out.println("Untar completed successfully!")
      } catch {
        case e: IOException =>
          out.println("catch Block")
      } finally {
        out.println("FINAL Block")
        if (tarIn != null) tarIn.close()
      }
    }
  }
rn0zuynd

rn0zuynd2#

gzip 不是一个 splittable 在hadoop中格式化。因此,文件不会真正分布在集群中,您也不会从hadoop或spark中的分布式计算/处理中获得任何好处。
更好的方法是,
解压操作系统上的文件,然后分别将文件发送回hadoop。
如果您仍然想在scala中解压,您可以简单地求助于java类 GZIPInputStream 通过

new GZIPInputStream(new FileInputStream("your file path"))

相关问题