SparkSQL JDBC(PySpark)到Postgres -创建表和使用CTE

jhkqcmku  于 5个月前  发布在  Spark
关注(0)|答案(3)|浏览(58)

我正在做一个项目,将Python概念验证(POC)移植到PySpark。POC大量利用Postgres,特别是PostGIS地理空间库。大多数工作包括Python在回调数据进行最终处理之前向Postgres发出命令。
传递给Postgres的一些查询包含CREATE TABLEINSERTCREATE TEMP TABLE和CTE WITH语句。我正在尝试确定是否可以通过JDBC从Spark将这些查询传递给Postgres。
有人能确认这个功能是否可以在Spark JDBC中用于其他数据库吗?需要说明的是,我希望将简单的英语SQL查询传递给Postgres,而不是使用可用的SparkSQL API(因为他们不支持我需要的所有操作),我使用的是Spark 2.3.0PostgreSQL 10.11,和Python 2.7.5(是的,我知道Python 2的EOL,那是另一个故事)。
以下是我到目前为止所尝试的:

使用SparkSession.read

创建Spark会话到Postgres

postgres = SparkSession.builder \
    .appName("myApp") \
    .config("spark.jars", "/usr/share/java/postgresql-jdbc.jar") \
    .getOrCreate()

字符串

定义传递给dbtable参数的查询

qry = """create table test (name varchar(50), age int)"""

qry传递给Postgres spark session对象的dbtable参数

postgres.read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://....) \
    .option("dbtable", qry) \
    .option("user", configs['user']) \
    .option("password", configs['password']) \
    .option("driver", "org.postgresql.Driver") \
    .option("ssl", "true") \
    .load()


它返回以下语法错误(当使用上面列出的其他SQL命令时,也会产生相同类型的错误):

Traceback (most recent call last):
  File "/tmp/zeppelin_pyspark-5711943099029736374.py", line 367, in <module>
    raise Exception(traceback.format_exc())
Exception: Traceback (most recent call last):
  File "/tmp/zeppelin_pyspark-5711943099029736374.py", line 360, in <module>
    exec(code, _zcUserQueryNameSpace)
  File "<stdin>", line 9, in <module>
  File "/usr/hdp/current/spark2-client/python/pyspark/sql/readwriter.py", line 172, in load
    return self._df(self._jreader.load())
  File "/usr/hdp/current/spark2-client/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 1160, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/usr/hdp/current/spark2-client/python/pyspark/sql/utils.py", line 63, in deco
    return f(*a, **kw)
  File "/usr/hdp/current/spark2-client/python/lib/py4j-0.10.6-src.zip/py4j/protocol.py", line 320, in get_return_value
    format(target_id, ".", name), value)
Py4JJavaError: An error occurred while calling o484.load.
: org.postgresql.util.PSQLException: ERROR: syntax error at or near "create"
  Position: 15

使用SparkSession.sql()

利用上面定义的相同postgres对象

将查询传递给.sql()

postgres.sql("""create table (name varchar(50), age int)""")


它返回以下解析异常:

Traceback (most recent call last):
  File "/tmp/zeppelin_pyspark-5711943099029736374.py", line 367, in <module>
    raise Exception(traceback.format_exc())
Exception: Traceback (most recent call last):
  File "/tmp/zeppelin_pyspark-5711943099029736374.py", line 360, in <module>
    exec(code, _zcUserQueryNameSpace)
  File "<stdin>", line 1, in <module>
  File "/usr/hdp/current/spark2-client/python/pyspark/sql/session.py", line 714, in sql
    return DataFrame(self._jsparkSession.sql(sqlQuery), self._wrapped)
  File "/usr/hdp/current/spark2-client/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 1160, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/usr/hdp/current/spark2-client/python/pyspark/sql/utils.py", line 73, in deco
    raise ParseException(s.split(': ', 1)[1], stackTrace)
ParseException: u"\nno viable alternative at input 'create table ('(line 1, pos 13)\n\n== SQL ==\ncreate table (name varchar(50), age int)\n-------------^^^\n"


如果我像postgres.sql("(create table (name varchar(50), age int))")那样将查询用引号括起来,那么我会得到一个不同的解析异常,这让我相信我想要的功能是不可能的:

Traceback (most recent call last):
  File "/tmp/zeppelin_pyspark-5711943099029736374.py", line 367, in <module>
    raise Exception(traceback.format_exc())
Exception: Traceback (most recent call last):
  File "/tmp/zeppelin_pyspark-5711943099029736374.py", line 360, in <module>
    exec(code, _zcUserQueryNameSpace)
  File "<stdin>", line 1, in <module>
  File "/usr/hdp/current/spark2-client/python/pyspark/sql/session.py", line 714, in sql
    return DataFrame(self._jsparkSession.sql(sqlQuery), self._wrapped)
  File "/usr/hdp/current/spark2-client/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 1160, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/usr/hdp/current/spark2-client/python/pyspark/sql/utils.py", line 73, in deco
    raise ParseException(s.split(': ', 1)[1], stackTrace)
ParseException: u"\nextraneous input 'create' expecting {'(', 'SELECT', 'FROM', 'VALUES', 'TABLE', 'INSERT', 'MAP', 'REDUCE'}(line 1, pos 1)\n\n== SQL ==\n(create table (name varchar(50), age int))\n-^^^\n"


我的问题可以归结为:
1.我的方法是否缺少某种配置或其他必要的步骤?

  1. spark.sql() API可以在Postgres中使用吗?
    1.我想做的事有可能实现吗?
    我已经在互联网上搜索了试图找到使用SparkSQL向PostgreSQL发出这类SQL查询的例子,但没有找到任何解决方案。如果有解决方案,我希望看到一个例子,否则确认这是不可能的就足够了。
ygya80vv

ygya80vv1#

我想做的事有可能实现吗?
我会说没有。Spark是一个数据处理框架,因此它的API主要是为使用JavaScript的 * 读 * 和 * 写 * 操作而开发的。在你的例子中,你有一些JavaScript语句,Spark不应该执行这样的操作。
例如,第一个示例中的dbtable选项必须是一个表名或某个SELECT查询。
如果你需要运行一些SQL、DCL、TCL查询,那么你应该用其他方式,例如通过psycopg2模块。
spark.sql()API可以在Postgres中使用吗?
spark.sql是一个在SparkSession表或视图中注册的SparkSQL代码的方法。它适用于任何受支持的MySQL,不仅适用于jdbc,而且适用于Spark端的SparkSQL语法。例如

val spark = SparkSession
        ...
        .getOrCreate()

spark.read
  .format("jdbc")
  .option("url", "jdbc:postgresql://ip/database_name")
  .option("dbtable", "schema.tablename")
  .load()
  .createOrReplaceTempView("my_spark_table_over_postgresql_table")

// and then you can operate with a view:
val df = spark.sql("select * from my_spark_table_over_postgresql_table where ... ")

字符串

ckx4rj1h

ckx4rj1h2#

这不是最好的选择,但你可以使用SQL注入解决它。

spark.read
  .format("jdbc")
  .option("url", s"""jdbc:postgresql://8.8.8.8/dbname""")
  .option(
    "dbtable",
    "(select 1) a; CREATE OR REPLACE VIEW schema.view AS SELECT c1 FROM schema.table WHERE c1 in ('C', 'H'); select * from (select 1) a"
  )
  .load()

Spark返回一个错误,但视图正在创建:
org.postgresql.util.PSQLException:查询返回了多个ResultSets。
我的解决方案适用于Spark 2.3.0,在Spark 3.2.0中有比黑客“dbtable”更好的选项,你可以使用名为“query”的字段。

t5zmwmid

t5zmwmid3#

accepted answer很棒,很好地解释了这种情况。另一种思考的方式是Spark是一个数据处理框架,处理延迟执行。一切都是转换或动作。为了实现我们需要执行动作。现在记住这一点,你能执行手头的任务吗?即使有一些黑客,但这对Spark框架来说是自然的吗?此外,黑客可能会导致意想不到的行为,我们应该注意这一点。
要总结spark语句(CREATE、ALTER、DROP、GRANT、REVOKE等)是不可能从spark执行的。

相关问题