我正在学习pyspark,并编写了一个简单的脚本,从我的一个hdfs目录加载一些json文件,将每个文件作为python字典加载(使用json.loads()),然后为每个对象提取一些字段。
相关信息存储在spark数据框中,我想将这些数据插入mysql表(我在本地创建了这个表)。
但是,当我运行这个时,我的连接url出现了一个错误。
它表示“java.lang.runtimeexception:[1.5]失败:应为“.”,但找到“:”
此时:
jdbc:mysql://localhost:3306/bigdata?user=root&password=pwd
^
数据库名为“bigdata”
上面包括用户名和密码
我相信端口号是正确的
这是我的完整剧本……:
import json
import pandas as pd
import numpy as np
from pyspark import SparkContext
from pyspark.sql import Row, SQLContext
SQL_CONNECTION="jdbc:mysql://localhost:3306/bigdata?user=root&password=pwd"
sc=SparkContext()
sqlContext = SQLContext(sc)
cols = ['Title', 'Site']
df = pd.DataFrame(columns=cols)
# First, load my files as RDD and convert them as JSON
rdd1 = sc.wholeTextFiles("hdfs://localhost:8020/user/ashishu/Project/sample data/*.json")
rdd2 = rdd1.map(lambda kv: json.loads(kv[1]))
# Read in the RDDs and do stuff
for record in rdd2.take(2):
title = record['title']
site = record['thread']['site_full']
vals = np.array([title, site])
df.loc[len(df)] = vals
sdf = sqlContext.createDataFrame(df)
sdf.show()
sdf.insertInto(SQL_CONNECTION, "sampledata")
sql\u connection是最开始的连接url,“sampledata”是我想在mysql中插入的表的名称。要使用的特定数据库是在连接url(“bigdata”)中指定的。
这是我的声明:
./bin/spark-submit /Users/ashishu/Desktop/sample.py --driver-class-path /Users/ashishu/Documents/Spark/.../bin/mysql-connector-java-5.1.42/mysql-connector-java-5.1.42-bin.jar
我正在使用spark 1.6.1
我是不是漏掉了一些关于mysql连接的蠢事?我试着用“.”替换“:”(jdbc和mysql之间),但显然没有解决任何问题,并产生了不同的错误。。。
谢谢
编辑
我按照建议修改了代码,这样就不用使用sdf.insertinto了,我说。。。
sdf.write.jdbc(SQL_CONNECTION, table="sampledata", mode="append")
但是,在终端中使用下面的submit命令后,出现了一个新错误:
./bin/spark-submit sample.py --jars <path to mysql-connector-java-5.1.42-bin.jar>
错误基本上是说“调用o53.jdbc时出错,找不到合适的驱动程序”。
你知道这个吗?
2条答案
按热度按时间5gfr0r5j1#
我想,解决方案是在我的/spark/conf文件夹中创建一个spark-env.sh文件,并在其中设置如下:
谢谢!
ui7jx7zq2#
insertInto
期望tablename
或者database.tablename
这就是为什么它会扔. expected but : found
错误。你需要的是jdbc
Dataframe编写器,即参见此处http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.dataframewriter.jdbc像这样的-