从spark代码插入配置单元表失败

tquggr8v  于 2021-06-26  发布在  Hive
关注(0)|答案(1)|浏览(266)
HIVE version = 1.1.0
SPARK version = 1.5.1
Distribution  =Cloudera CDH 5.5

我正在尝试从spark代码为spark作业的每次运行向配置单元表中插入一条记录
当调用getupdatedversion方法时,我看到创建了配置单元外部表,但由于下面的错误,插入到该表失败

Exception in thread "main" org.apache.spark.sql.AnalysisException:
Unsupported language features in query: INSERT INTO Ideal.events_log VALUES ('file_date_2017-04-05','omega_2017-04-05.csv','2017-05-15 17:54:58',1)
TOK_QUERY 0, 0,18, 0
TOK_FROM 0, -1,18, 0
TOK_VIRTUAL_TABLE 0, -1,18, 0
  TOK_VIRTUAL_TABREF 0, -1,-1, 0
    TOK_ANONYMOUS 0, -1,-1, 0
  TOK_VALUES_TABLE 1, 8,18, 42
    TOK_VALUE_ROW 1, 10,18, 42
      'file_date_2017-04-05' 1, 11,11, 42
      'omega_2017-04-05.csv' 1, 13,13, 65
      '2017-05-15 17:54:58' 1, 15,15, 98
      1 1, 17,17, 120
 TOK_INSERT 1, 0,-1, 12
  TOK_INSERT_INTO 1, 0,6, 12
  TOK_TAB 1, 4,6, 12
    TOK_TABNAME 1, 4,6, 12
      Ideal 1, 4,4, 12
      events_log 1, 6,6, 23
TOK_SELECT 0, -1,-1, 0
  TOK_SELEXPR 0, -1,-1, 0
    TOK_ALLCOLREF 0, -1,-1, 0

 scala.NotImplementedError: No parse rules for:
 TOK_VIRTUAL_TABLE 0, -1,18, 0
 TOK_VIRTUAL_TABREF 0, -1,-1, 0
 TOK_ANONYMOUS 0, -1,-1, 0
 TOK_VALUES_TABLE 1, 8,18, 42
 TOK_VALUE_ROW 1, 10,18, 42
  'file_date_2017-04-05' 1, 11,11, 42
  'omega_2017-04-05.csv' 1, 13,13, 65
  '2017-05-15 17:54:58' 1, 15,15, 98
  1 1, 17,17, 120

这是Hive表的结构

CREATE EXTERNAL TABLE `events_log`(
  `file_date` string,
  `file_name` string,
  `audit_timestamp` string,
  `version` int)
 ROW FORMAT DELIMITED
  FIELDS TERMINATED BY ','
STORED AS INPUTFORMAT
 'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
 LOCATION
  'hdfs://nameservice1//data/staged/Ideal/omega/events_log'

我的Spark代码:

def getupdatedVersion(sc:SparkContext, sqlContext: HiveContext,omega_file_date: String, omega_source_file_name: String,config:Config):Int = {

     sqlContext.sql(s""" CREATE EXTERNAL TABLE IF NOT EXISTS Ideal.events_log(file_date String, file_name String, audit_timestamp String, version int) ROW FORMAT DELIMITED FIELDS TERMINATED BY "," LOCATION "/data/staged/Ideal/omega/events_log"  """.stripMargin)

   val versionDF = sqlContext.sql(s""" SELECT MAX(version) AS version FROM Ideal.events_log WHERE file_date ='$omega_file_date' """.stripMargin )
   var version = versionDF.map(row => row.getInt(0)).first()

    if (version >= 0 )
    {
       version = version+1
  sqlContext.sql(s"""INSERT INTO Ideal.events_log VALUES ('$omega_file_date','$omega_source_file_name','$now',$version) """.stripMargin)
}
    else
     {
     version =0
    sqlContext.sql(s"""INSERT INTO Ideal.events_log VALUES ('$omega_file_date','$omega_source_file_name','$now',$version) """.stripMargin)
     }

 version
   }

有人能帮我解决这个问题吗?

xe55xuns

xe55xuns1#

更新的答案:我尝试了下面的,它工作,但每插入它是创建一个新的文件,因此我会继续由参孙建议

def getupdatedVersion(sc:SparkContext, sqlContext:    HiveContext,omega_file_date: String, omega_source_file_name: String,config:Config):Int = {

     sqlContext.sql(s""" CREATE EXTERNAL TABLE IF NOT EXISTS Ideal.events_log(file_date String, file_name String, audit_timestamp String, version int) ROW FORMAT DELIMITED FIELDS TERMINATED BY "," LOCATION "/data/staged/Ideal/omega/events_log"  """.stripMargin)

    val versionDF = sqlContext.sql(s""" SELECT MAX(version) AS version FROM Ideal.events_log WHERE file_date ='$omega_file_date' """.stripMargin )
    var version = versionDF.map(row => row.getInt(0)).first()

     if (version >= 0 )
     {
      version = version+1
   //sqlContext.sql(s"""INSERT INTO Ideal.events_log VALUES  ('$omega_file_date','$omega_source_file_name','$now',$version) """.stripMargin)

   val insertDF = sqlContext.sql(s"""SELECT "$omega_file_date" AS  omega_file_date, "$omega_source_file_name" AS omega_source_file_name , "$now" AS audit_timestamp , "$version" AS version  """.stripMargin)

 insertDF.write.mode("append").insertInto("<dbname>.events_log")
  }
   else
    {
 version =0
    //sqlContext.sql(s"""INSERT INTO Ideal.events_log VALUES ('$omega_file_date','$omega_source_file_name','$now',$version) """.stripMargin)
      val insertDF = sqlContext.sql(s"""SELECT "$omega_file_date" AS  omega_file_date, "$omega_source_file_name" AS omega_source_file_name , "$now" AS audit_timestamp , "$version" AS version  """.stripMargin)

     insertDF.write.mode("append").insertInto("<dbname>.events_log")
   }

 version
 }

相关问题