用sparksql读取 Impala 表

lawou6xi  于 2021-06-26  发布在  Hive
关注(0)|答案(1)|浏览(404)

我正在尝试执行一个查询,它的函数有lead。。结束。。分割和结合。当我尝试在impala上运行这个查询时,它运行得很好,但在hive上失败了。
我需要写一个spark作业来执行这个查询。它在sparksql中也是失败的,我的假设是因为spark1.6在内部使用hiveql来完成上述任务。
从sparksql读取impala表有什么不同的方法吗?因为在hive中工作的基本查询和sprksql都可以正常工作。
fyr我尝试运行的查询:

SELECT issue_id,
  CASE WHEN COALESCE(lead(created, 1) OVER (PARTITION BY issue_id ORDER BY created ASC,
  field_sequence ASC), '') = '' THEN 'to' ELSE LEAD('from', 1) OVER (PARTITION BY issue_id ORDER BY created ASC, field_sequence ASC) END Status,
  created StartDate,
  LEAD(created, 1) OVER (PARTITION BY issue_id ORDER BY created ASC, field_sequence ASC) EndDate
FROM  (
    SELECT issue_id, created, field, 'from', 'to', field_sequence FROM tab1 WHERE COALESCE(LOWER(field), '') = 'status'
    UNION
    SELECT issue_id, updated_date created, '' field, '' 'from', '' 'to', 0 field_sequence FROM tab2
) hc WHERE hc.issue_id = '123'

和错误消息:

Traceback (most recent call last):
      File "<stdin>", line 1, in <module>
      File "/opt/cloudera/parcels/<CDHVersion>/lib/spark/python/pyspark/sql/context.py", line 580, in sql
        return DataFrame(self._ssql_ctx.sql(sqlQuery), self)
      File "/opt/cloudera/parcels/<CDHVersion>/lib/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line 813, in __call__
      File "/opt/cloudera/parcels/<CDHVersion>/lib/spark/python/pyspark/sql/utils.py", line 45, in deco
        return f(*a,**kw)
      File "/opt/cloudera/parcels/<CDHVersion>/lib/spark/python/lib/py4j-0.9-src.zip/py4j/protocol.py", line 308, in get_return_value
    py4j.protocol.Py4JJavaError: An error occurred while calling o83.sql.
    : java.lang.RuntimeException: [1.55] failure: ``)'' expected but identifier OVER found

    at scala.sys.package$.error(package.scala:27)
    at org.apache.spark.sql.catalyst.AbstractSparkSQLParser.parse(AbstractSparkSQLParser.scala:36)
    at org.apache.spark.sql.catalyst.DefaultParserDialect.parse(ParserDialect.scala:67)
    at org.apache.spark.sql.SQLContext$$anonfun$2.apply(SQLContext.scala:211)
    at org.apache.spark.sql.SQLContext$$anonfun$2.apply(SQLContext.scala:211)
    at org.apache.spark.sql.execution.SparkSQLParser$$anonfun$org$apache$spark$sql$execution$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:114)
    at org.apache.spark.sql.execution.SparkSQLParser$$anonfun$org$apache$spark$sql$execution$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:113)
    at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136)
    at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135)
    at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
    at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
    at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
    at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
    at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
    at scala.util.parsing.combinator.Parsers$Failure.append(Parsers.scala:202)
    at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
    at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
    at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
    at scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)
    at scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
    at scala.util.parsing.combinator.Parsers$$anon$2.apply(Parsers.scala:890)
    at scala.util.parsing.combinator.PackratParsers$$anon$1.apply(PackratParsers.scala:110)
    at org.apache.spark.sql.catalyst.AbstractSparkSQLParser.parse(AbstractSparkSQLParser.scala:34)
    at org.apache.spark.sql.SQLContext$$anonfun$1.apply(SQLContext.scala:208)
    at org.apache.spark.sql.SQLContext$$anonfun$1.apply(SQLContext.scala:208)
    at org.apache.spark.sql.execution.datasources.DDLParser.parse(DDLParser.scala:43)
    at org.apache.spark.sql.SQLContext.parseSql(SQLContext.scala:231)
    at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:817)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
    at py4j.Gateway.invoke(Gateway.java:259)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:209)
    at java.lang.Thread.run(Thread.java:745)
e3bfsja2

e3bfsja21#

你错过了一个 AS 定义时 Status ,上一个select语句中缺少几个逗号。而且 coalesce 是没用的,你可以用 IF ELSE 因为只有一个病例。
你应该分解你的计算,这样你就没有嵌套 select 但是,他们效率低下。

相关问题