cdh 6.3.3 spark 2.4.0和scala 2.11以及hbase 2.1.0版本在向hbase加载Dataframe时遇到问题

v9tzhpje  于 2021-07-12  发布在  Spark
关注(0)|答案(0)|浏览(196)

我在spark submit中使用以下jar:

--jars $(pwd)/commons-httpclient-3.1.jar,
         $(pwd)/postgresql-42.2.5.jar,
         $(pwd)/hbase-client-2.1.0.jar,
         $(pwd)/hbase-spark-1.0.0.jar,
         $(pwd)/shc-core-1.1.3-2.4-s_2.11.jar,
         $(pwd)/hbase-0.94.21.jar,
         $(pwd)/hbase-common-2.1.0.jar,
         $(pwd)/hbase-protocol-shaded-2.1.10.jar,
         $(pwd)/hbase-protocol-2.1.0.jar,
         $(pwd)/hbase-server-2.1.0.jar,
         $(pwd)/hadoop-client-2.5.0.jar,
         $(pwd)/htrace-core-3.2.0-incubating.jar,
         $(pwd)/hbase-shaded-miscellaneous-2.1.0.jar

低于错误
引起原因:java.lang.reflect.invocationtargetexception引起原因:java.lang.nosuchmethoderror:org.apache.hadoop.hbase.util.bytes.createmaxbytearray(i)[b]
下面是我的代码截图

import org.apache.hadoop.hbase.spark.datasources.HBaseTableCatalog
    import org.apache.spark.sql.{DataFrame, SparkSession}

    object Main{

      /**
        * Fetch data from the Postggres and return DataFrame Object
        * @param sparkSession
        * @return
        */
      def fetch(sparkSession: SparkSession): DataFrame = {

        val query = "(select  * from table ") srceinfo"

        val dbOptions:Map[String,String]= Map(
          "url" -> dburl,
          "dbtable" -> query,
          "user" -> dbusername,
          "password" -> dbpwd,
          "driver" -> driver
        )

        val data = sparkSession.read.format("jdbc").options(dbOptions).load()
        //data.show(10)
        return data

      }

      /**
        *
        * @param data
        */
      def saveToHbase(data: DataFrame): Unit ={
        val schema =
          s"""{
             |"table":{"namespace":"default","name":"address_mdm"},
             |"rowkey":"address_id",
             |"columns":{
             |"address_id":{"cf":"rowkey","col":"address_id","type":"integer"},
             |"address_line_1":{"cf":"address_details","col":"address_line_1","type":"string"},
             |"city":{"cf":"address_details","col":"city","type":"string"},
             |"test_id":{"cf":"location_details","col":"test_id","type":"string"}
             |}
             |}""".stripMargin

        val hbaseOptions = Map(
          HBaseTableCatalog.tableCatalog -> schema,
          HBaseTableCatalog.newTable -> "4"
        )
        data.write.options(hbaseOptions)
          .format("org.apache.spark.sql.execution.datasources.hbase")
          .save()
      }

      def getSession(): SparkSession = {
        return SparkSession.builder().appName("DataMigrator").getOrCreate()
      }

      def main(args: Array[String]){

        val sparkSession = getSession()
        val data = fetch(sparkSession)
        saveToHbase(data)

      }

    }

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题