fileutil.copymerge()

vltsax25  于 2021-05-29  发布在  Hadoop
关注(0)|答案(2)|浏览(339)

我装了一个 DataFrame 进入 HDFS 作为 text 使用下面的代码格式化。 finalDataFrameDataFrame ```
finalDataFrame.repartition(1).rdd.saveAsTextFile(targetFile)

执行上述代码后,我发现用我提供的文件名创建了一个目录,在该目录下创建了一个文件,但不是文本格式。文件名类似于第00000部分。
我已经解决了这个问题 `HDFS` 使用下面的代码。

val hadoopConfig = new Configuration()
val hdfs = FileSystem.get(hadoopConfig)
FileUtil.copyMerge(hdfs, new Path(srcPath), hdfs, new Path(dstPath), true, hadoopConfig, null)

现在我可以用给定的文件名获取上述路径中的文本文件。
但是当我尝试在s3中执行相同的操作时,它显示出一些异常

FileUtil.copyMerge(hdfs, new Path(srcPath), hdfs, new Path(dstPath), true, hadoopConfig, null)

java.lang.IllegalArgumentException: Wrong FS:
s3a://globalhadoop/data, expected:
hdfs://*****.aws.*.com:8050

这里似乎不支持s3路径。有谁能帮忙做这件事吗。
nue99wik

nue99wik1#

我用下面的代码解决了这个问题。

def createOutputTextFile(srcPath: String, dstPath: String, s3BucketPath: String): Unit = {
    var fileSystem: FileSystem = null
    var conf: Configuration = null
    if (srcPath.toLowerCase().contains("s3a") || srcPath.toLowerCase().contains("s3n")) {
      conf = sc.hadoopConfiguration
      fileSystem = FileSystem.get(new URI(s3BucketPath), conf)
    } else {
      conf = new Configuration()
      fileSystem = FileSystem.get(conf)
    }
    FileUtil.copyMerge(fileSystem, new Path(srcPath), fileSystem, new Path(dstPath), true, conf, null)
  }

我已经为s3和hdfs的文件系统编写了代码,它们都工作得很好。

mzaanser

mzaanser2#

您将hdfs文件系统作为目标文件系统传入 FileUtil.copyMerge . 您需要获得目的地的真实fs,您可以通过调用 Path.getFileSystem(Configuration) 在您创建的目标路径上。

相关问题