使用scala将文件从本地移动到hdfs时出错

roqulrg3  于 2021-05-29  发布在  Hadoop
关注(0)|答案(1)|浏览(315)

我有一个scala列表:filenames,它包含本地目录中的文件名。前任:

fileNames(2)
res0: String = file:///tmp/audits/xx_user.log

我正在尝试使用scala从列表中移动一个文件:filenames,从local到hdfs。为此,我遵循以下步骤:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import org.apache.commons.io.IOUtils;
val hadoopconf = new Configuration();
hadoopconf.addResource(new Path("/etc/hadoop/conf/core-site.xml"));
val fs = FileSystem.get(hadoopconf);
val outFileStream = fs.create(new Path("hdfs://mydev/user/devusr/testfolder"))

代码在这里之前运行良好。当我尝试添加inputstream时,得到如下错误消息:

val inStream = fs.open(new Path(fileNames(2)))
java.lang.IllegalArgumentException: Wrong FS: file:/tmp/audits/xx_user.log, expected: hdfs://mergedev

我还尝试直接指定文件名,结果相同:

val inStream = fs.open(new Path("file:///tmp/audits/xx_user.log"))
java.lang.IllegalArgumentException: Wrong FS: file:/tmp/audits/xx_user.log, expected: hdfs://mergedev

但当我尝试将文件直接加载到spark中时,它运行良好:

val localToSpark = spark.read.text(fileNames(2))
localToSpark: org.apache.spark.sql.DataFrame = [value: string]
localToSpark.collect
res1: Array[org.apache.spark.sql.Row] = Array([[Wed Dec 20 06:18:02 UTC 2017] INFO: ], [*********************************************************************************************************], [ ], [[Wed Dec 20 06:18:02 UTC 2017] INFO: Diagnostic log for xx_user.]

有没有人能告诉我在这一点上我犯了什么错误: val inStream = fs.open(new Path(fileNames(2))) 我得到了错误。

vnzz0bqm

vnzz0bqm1#

对于小文件, copyFromLocalFile() 足够了:

fs.copyFromLocalFile(new Path(localFileName), new Path(hdfsFileName))

对于大文件,使用apache commons io更有效:

IOUtils.copyLarge(
  new FileInputStream(new File(localFileName)), fs.create(new Path(hdfsFileName)))

请记住,本地文件名不应包含协议(因此 file:/// 在那里)

相关问题