我有一个scala列表:filenames,它包含本地目录中的文件名。前任:
fileNames(2)
res0: String = file:///tmp/audits/xx_user.log
我正在尝试使用scala从列表中移动一个文件:filenames,从local到hdfs。为此,我遵循以下步骤:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.commons.io.IOUtils;
val hadoopconf = new Configuration();
hadoopconf.addResource(new Path("/etc/hadoop/conf/core-site.xml"));
val fs = FileSystem.get(hadoopconf);
val outFileStream = fs.create(new Path("hdfs://mydev/user/devusr/testfolder"))
代码在这里之前运行良好。当我尝试添加inputstream时,得到如下错误消息:
val inStream = fs.open(new Path(fileNames(2)))
java.lang.IllegalArgumentException: Wrong FS: file:/tmp/audits/xx_user.log, expected: hdfs://mergedev
我还尝试直接指定文件名,结果相同:
val inStream = fs.open(new Path("file:///tmp/audits/xx_user.log"))
java.lang.IllegalArgumentException: Wrong FS: file:/tmp/audits/xx_user.log, expected: hdfs://mergedev
但当我尝试将文件直接加载到spark中时,它运行良好:
val localToSpark = spark.read.text(fileNames(2))
localToSpark: org.apache.spark.sql.DataFrame = [value: string]
localToSpark.collect
res1: Array[org.apache.spark.sql.Row] = Array([[Wed Dec 20 06:18:02 UTC 2017] INFO: ], [*********************************************************************************************************], [ ], [[Wed Dec 20 06:18:02 UTC 2017] INFO: Diagnostic log for xx_user.]
有没有人能告诉我在这一点上我犯了什么错误: val inStream = fs.open(new Path(fileNames(2)))
我得到了错误。
1条答案
按热度按时间vnzz0bqm1#
对于小文件,
copyFromLocalFile()
足够了:对于大文件,使用apache commons io更有效:
请记住,本地文件名不应包含协议(因此
file:///
在那里)