Input CSV Data
userid, Code, Status
1234, 1 , final
1287, 2, notfinal
# Applied Pyspark Script
# Create Spark Session
spark = SparkSession.builder.master("yarn").appName().enableHiveSupport().config("spark.some.config.option", "some-value").getOrCreate()
#read csv data into dataframe
df = spark.read.load("Book3.csv",format="csv", sep=",", inferSchema="true", header="true")
#define schema for json df
newschema = StructType([StructField("userid", StringType()),StructField("report",
StringType(),metadata={"maxlength":6000})])
jsondf = df.rdd.map(lambda row: (row[0], ({"Code":row[1],"status" : row[2]})))\
.map(lambda row: (row[0], json.dumps(row[1])))\
.toDF(newschema)
jsondf.write.format("mongo").mode("append")\
.option("uri","mongodb://gcp.mongodb.net/").option("database","dbname").option("collection",
"testcollection").save()
结果mongo数据
{
"userid" : "1234",
"report" : "{\"Code\": \"1\", \"status\": \"final\"}"
}
{
"userid" : "1287",
"report" : "{\"Code\": \"2\", \"status\": \"notfinal\"}"
}
在mongo中,我在“report”中得到一个完整的json编码字符串,这并不奇怪,因为我将report字段作为stringtype()。
这有效地使得mongo中任何基于嵌套字段的搜索都变得不可能,整个代码也就毫无用处了。
如何使它成为一个正确的嵌套json,以便mongo也可以搜索嵌套字段?
当我尝试使用下面的代码将字段更改为正确的结构化json时
>>> new_df = sql_context.read.json(df.rdd.map(lambda r: r.json))
>>> new_df.printSchema()
我得到一个错误“raiseattributeerror(item)attributeerror:json”
请帮助soem代码提示。。。
我也可以使用groupby,但是在聚合函数中放什么比较困难,我需要结果中的dataframe来写入mongo。
1条答案
按热度按时间5hcedyr01#
解决方案是在pyspark“df\u schema”中正确定义schema,然后将基本dfMap到新的df“df\u mongo”,确保df.rdd.map应该遵循df\u schema中定义的模式。