sc.textfile(“”)在eclipse中工作,但不在jar中

ljsrvy3e  于 2021-05-29  发布在  Hadoop
关注(0)|答案(1)|浏览(356)

我正在编写一个hadoop集群中的代码,但首先,我用本地文件在本地测试它。代码在eclipse中运行得很好,但是当我使用sbt(使用spark lib等)制作一个巨大的jar时,程序一直运行到 textFile(path) 我的代码是:

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.log4j.{Level, Logger}
import org.joda.time.format.DateTimeFormat
import org.apache.spark.rdd.RDD
import scala.collection.mutable.ArrayBuffer

object TestCRA2 {

    val conf = new SparkConf()
      .setMaster("local")
      .setAppName("Test")
      .set("spark.driver.memory", "4g")
      .set("spark.executor.memory", "4g")
    val context = new SparkContext(conf)//.master("local")
    val rootLogger = Logger.getRootLogger()
    rootLogger.setLevel(Level.ERROR)

    def TimeParse1(path: String) : RDD[(Int,Long,Long)] = {
        val data = context.textFile(path).map(_.split(";"))
        return data
    }

    def main(args: Array[String]) {

        val data = TimeParse1("file:///home/quentin/Downloads/CRA") 
    }
}

我的错误是:

Exception in thread "main" java.io.IOException: No FileSystem for scheme: file
    at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2586)
    at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2593)
    at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)
    at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2632)
    at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2614)
    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370)
    at org.apache.hadoop.fs.FileSystem.getLocal(FileSystem.java:341)
    at org.apache.spark.SparkContext$$anonfun$hadoopFile$1.apply(SparkContext.scala:1034)
    at org.apache.spark.SparkContext$$anonfun$hadoopFile$1.apply(SparkContext.scala:1029)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.SparkContext.withScope(SparkContext.scala:701)
    at org.apache.spark.SparkContext.hadoopFile(SparkContext.scala:1029)
    at org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:832)
    at org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:830)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.SparkContext.withScope(SparkContext.scala:701)
    at org.apache.spark.SparkContext.textFile(SparkContext.scala:830)
    at main.scala.TestCRA2$.TimeParse1(TestCRA.scala:37)
    at main.scala.TestCRA2$.main(TestCRA.scala:84)
    at main.scala.TestCRA2.main(TestCRA.scala)

我不能把我的文件放到jar中,因为它们在hadoop集群中,而且它正在eclipse上工作。
这是我的build.sbt:

name := "BloomFilters"

version := "1.0"

scalaVersion := "2.11.6"

libraryDependencies += "org.apache.spark" %% "spark-core" % "2.2.0"

libraryDependencies += "joda-time" % "joda-time" % "2.9.3"

assemblyMergeStrategy in assembly := {
 case PathList("META-INF", xs @ _*) => MergeStrategy.discard
 case x => MergeStrategy.first
}

如果我不做我的工作 assemblyMergeStrategy 像这样,我有一大堆合并错误。
事实上我需要改变我的想法 build.sbt 这样地:

name := "BloomFilters"

version := "1.0"

scalaVersion := "2.11.6"

libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.2.0"

libraryDependencies += "joda-time" % "joda-time" % "2.9.3"

assemblyMergeStrategy in assembly := {
    case PathList("META-INF", xs @ _*) =>
      (xs map {_.toLowerCase}) match {

        case "services" :: xs => MergeStrategy.first

        case _ => MergeStrategy.discard
 }
 case x => MergeStrategy.first
}

谢谢@lyomi

dojqjjoe

dojqjjoe1#

你的 sbt assembly 可能忽略了一些必需的文件。具体来说,hadoop的 FileSystem 类依赖于查找所有 META-INFO/services/org.apache.hadoop.fs.FileSystem 类路径中的文件。
在eclipse上可以,因为每个jar都有相应的文件,但是在uberjar中,一个jar可能会覆盖其他jar,从而导致 file: 无法识别的方案。
在sbt设置中,添加以下内容,以连接服务发现文件,而不是丢弃其中的一些文件。

val defaultMergeStrategy: String => MergeStrategy = { 
  case PathList("META-INF", xs @ _*) =>
    (xs map {_.toLowerCase}) match {
      // ... possibly other settings ...
      case "services" :: xs =>
        MergeStrategy.filterDistinctLines
      case _ => MergeStrategy.deduplicate
  }
  case _ => MergeStrategy.deduplicate
}

有关详细信息,请参阅sbt assembly的readme.md。

相关问题