如何在databricks dbfs**中列出文件密钥,而不使用**dbutils

qij5mzcb  于 2021-05-17  发布在  Spark
关注(0)|答案(1)|浏览(356)

显然dbutils不能在cmd行spark submits中使用,您必须使用jar作业,但是由于其他要求,我必须使用spark submit样式的作业,但是仍然需要列出dbfs中的文件键并对其进行迭代,以决定将哪些文件用作进程的输入。。。
使用scala,我可以使用spark或hadoop中的哪个lib来检索 dbfs:/filekeys 一种特殊的模式?

import org.apache.hadoop.fs.Path
import org.apache.spark.sql.SparkSession

def ls(sparkSession: SparkSession, inputDir: String): Seq[String] = {
  println(s"FileUtils.ls path: $inputDir")
  val path = new Path(inputDir)
  val fs = path.getFileSystem(sparkSession.sparkContext.hadoopConfiguration)
  val fileStatuses = fs.listStatus(path)
  fileStatuses.filter(_.isFile).map(_.getPath).map(_.getName).toSeq
}

使用上面的方法,如果我传入一个部分键前缀 dbfs:/mnt/path/to/folder 当所述“文件夹”中存在以下键时:
/mnt/path/to/folder/file1.csv /mnt/path/to/folder/file2.csv 我明白了 dbfs:/mnt/path/to/folder is not a directory 当它击中 val path = new Path(inputDir)

xzlaal3s

xzlaal3s1#

需要使用sparksession来完成。
我们是这样做的:

import org.apache.commons.io.IOUtils
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.SparkSession

def getFileSystem(sparkSession: SparkSession): FileSystem =
    FileSystem.get(sparkSession.sparkContext.hadoopConfiguration)

def listContents(sparkSession: SparkSession, dir: String): Seq[String] = {
  getFileSystem(sparkSession).listStatus(new path(dir)).toSeq.map(_.getPath).map(_.getName)
}

相关问题