pyspark到mysql插入错误?

nsc4cvqm  于 2021-05-29  发布在  Hadoop
关注(0)|答案(2)|浏览(218)

我正在学习pyspark,并编写了一个简单的脚本,从我的一个hdfs目录加载一些json文件,将每个文件作为python字典加载(使用json.loads()),然后为每个对象提取一些字段。
相关信息存储在spark数据框中,我想将这些数据插入mysql表(我在本地创建了这个表)。
但是,当我运行这个时,我的连接url出现了一个错误。
它表示“java.lang.runtimeexception:[1.5]失败:应为“.”,但找到“:”
此时:

jdbc:mysql://localhost:3306/bigdata?user=root&password=pwd
    ^

数据库名为“bigdata”
上面包括用户名和密码
我相信端口号是正确的
这是我的完整剧本……:

import json 
import pandas as pd 
import numpy as np 
from pyspark import SparkContext 
from pyspark.sql import Row, SQLContext

SQL_CONNECTION="jdbc:mysql://localhost:3306/bigdata?user=root&password=pwd"
sc=SparkContext()
sqlContext = SQLContext(sc)

cols = ['Title', 'Site']
df = pd.DataFrame(columns=cols)

# First, load my files as RDD and convert them as JSON

rdd1 = sc.wholeTextFiles("hdfs://localhost:8020/user/ashishu/Project/sample data/*.json")
rdd2 = rdd1.map(lambda kv: json.loads(kv[1])) 

# Read in the RDDs and do stuff

for record in rdd2.take(2):

    title = record['title']
    site = record['thread']['site_full']
    vals = np.array([title, site])
    df.loc[len(df)] = vals

sdf = sqlContext.createDataFrame(df)
sdf.show()
sdf.insertInto(SQL_CONNECTION, "sampledata")

sql\u connection是最开始的连接url,“sampledata”是我想在mysql中插入的表的名称。要使用的特定数据库是在连接url(“bigdata”)中指定的。
这是我的声明:

./bin/spark-submit /Users/ashishu/Desktop/sample.py --driver-class-path /Users/ashishu/Documents/Spark/.../bin/mysql-connector-java-5.1.42/mysql-connector-java-5.1.42-bin.jar

我正在使用spark 1.6.1
我是不是漏掉了一些关于mysql连接的蠢事?我试着用“.”替换“:”(jdbc和mysql之间),但显然没有解决任何问题,并产生了不同的错误。。。
谢谢
编辑
我按照建议修改了代码,这样就不用使用sdf.insertinto了,我说。。。

sdf.write.jdbc(SQL_CONNECTION, table="sampledata", mode="append")

但是,在终端中使用下面的submit命令后,出现了一个新错误:

./bin/spark-submit sample.py --jars <path to mysql-connector-java-5.1.42-bin.jar>

错误基本上是说“调用o53.jdbc时出错,找不到合适的驱动程序”。
你知道这个吗?

5gfr0r5j

5gfr0r5j1#

我想,解决方案是在我的/spark/conf文件夹中创建一个spark-env.sh文件,并在其中设置如下:

export SPARK_CLASSPATH=$SPARK_CLASSPATH:/<path to your mysql connector jar file>

谢谢!

ui7jx7zq

ui7jx7zq2#

insertInto 期望 tablename 或者 database.tablename 这就是为什么它会扔 . expected but : found 错误。你需要的是 jdbc Dataframe编写器,即参见此处http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.dataframewriter.jdbc
像这样的-

sdf.write.jdbc(SQL_CONNECTION, table=bigdata.sampledata,mode='append')

相关问题