在spark/scala中循环和处理多个hdfs文件

5uzkadbs  于 2021-07-12  发布在  Spark
关注(0)|答案(1)|浏览(371)

我的hdfs文件夹中有多个文件,我想在上面循环并运行scala转换逻辑。
我正在使用下面的脚本,它在使用本地文件的开发环境中运行良好,但在hdfs环境中运行时失败。你知道我哪里做错了吗?

val files = new File("hdfs://172.X.X.X:8020/landing/").listFiles.map(_.getName).toList

files.foreach { file =>
print(file) 
val event = spark.read.option("multiline", "true").json("hdfs://172.X.X.X:8020/landing/" + file)
event.show(false)
}

有人能纠正它或建议替代方案请。

mefy6pfw

mefy6pfw1#

您应该使用hadoop io库来处理hadoop文件。
代码:

import java.net.URI
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.SparkSession

val spark=SparkSession.builder().master("local[*]").getOrCreate()

val fs=FileSystem.get(new URI("hdfs://172.X.X.X:8020/"),spark.sparkContext.hadoopConfiguration)

fs.globStatus(new Path("/landing/*")).toList.foreach{
   f=>
   val event = spark.read.option("multiline", "true").json("hdfs://172.X.X.X:8020/landing/" + f.getPath.getName)
   event.show(false)
}

相关问题