无法在群集模式下使用spark读取hbase数据

8aqjt8rx  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(548)

hadoop:cdh-6.2.1
Spark:2.4.0
hbase:2.0版
我的工作:通过spark读取hbase数据
当我使用intellij和 local 模式一切正常,但当通过 spark-submit --master yarn 模式下,将发生以下堆栈跟踪:

20/05/20 11:00:46 ERROR mapreduce.TableInputFormat: java.io.IOException: java.lang.reflect.InvocationTargetException
    at org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory.java:221)
    at org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory.java:114)
    at org.apache.hadoop.hbase.mapreduce.TableInputFormat.initialize(TableInputFormat.java:200)
    at org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getSplits(TableInputFormatBase.java:243)
    at org.apache.hadoop.hbase.mapreduce.TableInputFormat.getSplits(TableInputFormat.java:254)
    at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:131)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
    at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2146)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
    at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
    at com.song.HbaseOnSpark1$.main(HbaseOnSpark1.scala:32)
    at com.song.HbaseOnSpark1.main(HbaseOnSpark1.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:673)
Caused by: java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    at org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory.java:219)
    ... 27 more
Caused by: java.lang.NullPointerException
    at org.apache.hadoop.hbase.client.ConnectionImplementation.close(ConnectionImplementation.java:1938)
    at org.apache.hadoop.hbase.client.ConnectionImplementation.<init>(ConnectionImplementation.java:310)
    ... 32 more

20/05/20 11:00:46 ERROR yarn.ApplicationMaster: User class threw exception: java.io.IOException: Cannot create a record reader because of a previous error. Please look at the previous logs lines from the task's full log for more details.
java.io.IOException: Cannot create a record reader because of a previous error. Please look at the previous logs lines from the task's full log for more details.
    at org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getSplits(TableInputFormatBase.java:254)
    at org.apache.hadoop.hbase.mapreduce.TableInputFormat.getSplits(TableInputFormat.java:254)
    at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:131)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
    at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2146)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
    at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
    at com.song.HbaseOnSpark1$.main(HbaseOnSpark1.scala:32)
    at com.song.HbaseOnSpark1.main(HbaseOnSpark1.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:673)
Caused by: java.lang.IllegalStateException: The input format instance has not been properly initialized. Ensure you call initializeTable either in your constructor or initialize method
    at org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getTable(TableInputFormatBase.java:558)
    at org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getSplits(TableInputFormatBase.java:249)
    ... 24 more

这是我的密码:

val conf: SparkConf = new SparkConf().setAppName("spark1")
    val spark = new SparkContext(conf)

    val hbaseConf: Configuration = HBaseConfiguration.create()
    hbaseConf.set("hbase.zookeeper.quorum","hadoop01,hadoop02,hadoop03")
    hbaseConf.set(TableInputFormat.INPUT_TABLE,"idx_name")
    hbaseConf.set("hbase.defaults.for.version.skip", "true")

    val rdd: RDD[(ImmutableBytesWritable, Result)] = spark.newAPIHadoopRDD(
      hbaseConf,
      classOf[TableInputFormat],
      classOf[ImmutableBytesWritable],
      classOf[Result]
    )
1wnzp6jl

1wnzp6jl1#

它是集群中的hbase classpatth问题,但是您需要像这样向类路径添加hbase jar

export SPARK_CLASSPATH=$SPARK_CLASSPATH:`hbase classpath`
``` `hbase classpath` 将提供所有用于hbase连接等的JAR。。。。

#### 为什么它在本地模式下工作?

因为所有需要的jar都在idelib中
如果你使用maven `mvn depdency:tree` 了解集群中需要哪些jar。在此基础上你可以调整你的 `spark-submit` 脚本。
如果您正在使用 `--jars` 选项查看所有jar是否正确传递,或者uber jar在打包jar时是否具有正确的依赖项。。

#### 可能会有jar冲突,也请仔细检查本地模式环境,因为这样可以正常工作。

进一步阅读spark submit——jars参数需要逗号列表,如何声明jars目录?

相关问题