在aws emr(emr-5.28.1)群集上使用“shc core:1.1.1-2.1-s琰2.11”连接hbase和spark时出现问题

wvt8vs2t  于 2021-06-07  发布在  Hbase
关注(0)|答案(0)|浏览(252)

早上好,
我在aws emr集群上连接ApacheSpark和hbase时遇到了一个问题,我认为这个问题是由于“shc core”和“json4s”的不兼容造成的,尽管我修改了“spark,scala,shc core”的版本,但我还是遇到了同样的错误。

Exception in thread "main" java.lang.NoSuchMethodError: org.json4s.jackson.JsonMethods$.parse(Lorg/json4s/JsonInput;Z)Lorg/json4s/JsonAST$JValue;
    at org.apache.spark.sql.execution.datasources.hbase.HBaseTableCatalog$.apply(HBaseTableCatalog.scala:257)
    at org.apache.spark.sql.execution.datasources.hbase.HBaseRelation.<init>(HBaseRelation.scala:80)
    at org.apache.spark.sql.execution.datasources.hbase.DefaultSource.createRelation(HBaseRelation.scala:59)
    at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
    at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
    at Dataflow$.main(Dataflow.scala:56)
    at Dataflow.main(Dataflow.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
    at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:853)
    at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)
    at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
    at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
    at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:928)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:937)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

build.sbt文件的配置如下:

name := "hive_spark_hbase"

version := "1.0"

scalaVersion := "2.11.12"
val sparkVersion = "2.3.4"

resolvers += "jar_hortonworks" at "http://repo.hortonworks.com/content/groups/public"

libraryDependencies ++= Seq(
  "org.scala-lang"   %  "scala-library"        % scalaVersion.value,
  "org.apache.spark" %% "spark-sql"            % sparkVersion,
  "org.apache.spark" %% "spark-sql-kafka-0-10" % sparkVersion,
  "org.apache.spark" %% "spark-mllib"          % sparkVersion,
  "com.hortonworks" % "shc-core" % "1.1.1-2.1-s_2.11"

)

我要将数据加载到hbase的主类:

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.execution.datasources.hbase._
import org.apache.spark.sql.execution.datasources.hbase.HBaseTableCatalog
import org.apache.spark.sql.SparkSession
import org.apache.spark.SparkConf
import org.apache.spark.sql.SQLContext
import org.apache.spark.SparkConf

case class Employee(id:String , name: String , lastname: String , mname:String , adress:String , city:String , state : String , zipcode : String)

object Dataflow {

  def main(args: Array[String]): Unit = {

        val catalog=
        s"""{
    |"table":{"namespace":"default","name":"employee"},
        |"rowkey":"key",
        |"columns":{
        |"key":{"cf":"rowkey","col":"key","type":"string"},
        |"fName":{"cf":"person","col":"firstName","type":"string"},
        |"lName":{"cf":"person","col":"lastName","type":"string"},
        |"mName":{"cf":"person","col":"middleName","type":"string"},
        |"addressLine":{"cf":"address","col":"addressLine","type":"string"},
        |"city":{"cf":"address","col":"city","type":"string"},
        |"state":{"cf":"address","col":"state","type":"string"},
        |"zipCode":{"cf":"address","col":"zipCode","type":"string"}
        |}
    |}""".stripMargin

        val data=Seq(
                      Employee("1","Abby","Smith","K","3456main","Orlando","FL","45235"),
                      Employee("2","Amaya","Williams","L","123Orange","Newark","NJ","27656"),
                      Employee("3","Alchemy","Davis","P","Warners","Sanjose","CA","34789")
                    )

        val master = "yarn"
        val appName = "MyApp"
        val conf: SparkConf = new SparkConf()
                .setMaster(master)
                .setAppName(appName)
                .set("spark.driver.allowMultipleContexts", "false")
                .set("spark.ui.enabled", "false")

        val spark = SparkSession.builder().config(conf).getOrCreate()

        import spark.implicits._

        val df=spark.sparkContext.parallelize(data).toDF

        df.write.options(
                Map(HBaseTableCatalog.tableCatalog->catalog,HBaseTableCatalog.newTable->"4"))
                .format("org.apache.spark.sql.execution.datasources.hbase")
                .save()

  }

}

最后是这样的工作:

spark-submit  --class Dataflow --master yarn --packages com.hortonworks:shc-core:1.1.1-2.1-s_2.11 --repositories http://repo.hortonworks.com/content/groups/public/ --files /etc/hbase/conf/hbase-site.xml /home/hadoop/spark_hbase/target/scala-2.11/hive_spark_hbase_2.11-1.0.jar

谢谢,太多了。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题