pyspark将csv转换为mongodb中的嵌套json

kcrjzv8t  于 2021-05-26  发布在  Spark
关注(0)|答案(1)|浏览(383)
Input CSV Data
userid,      Code,      Status
1234,        1 ,        final
1287,       2,        notfinal

# Applied Pyspark Script

# Create Spark Session

    spark =  SparkSession.builder.master("yarn").appName().enableHiveSupport().config("spark.some.config.option", "some-value").getOrCreate()
    #read csv data into dataframe
    df =  spark.read.load("Book3.csv",format="csv", sep=",", inferSchema="true", header="true")
    #define schema for json df
    newschema = StructType([StructField("userid", StringType()),StructField("report", 
    StringType(),metadata={"maxlength":6000})])

    jsondf = df.rdd.map(lambda row: (row[0], ({"Code":row[1],"status" : row[2]})))\
    .map(lambda row: (row[0], json.dumps(row[1])))\
    .toDF(newschema) 

    jsondf.write.format("mongo").mode("append")\
    .option("uri","mongodb://gcp.mongodb.net/").option("database","dbname").option("collection", 
    "testcollection").save()

结果mongo数据

{
    "userid" : "1234",
    "report" : "{\"Code\": \"1\", \"status\": \"final\"}"
}
{
    "userid" : "1287",
    "report" : "{\"Code\": \"2\", \"status\": \"notfinal\"}"
}

在mongo中,我在“report”中得到一个完整的json编码字符串,这并不奇怪,因为我将report字段作为stringtype()。
这有效地使得mongo中任何基于嵌套字段的搜索都变得不可能,整个代码也就毫无用处了。
如何使它成为一个正确的嵌套json,以便mongo也可以搜索嵌套字段?
当我尝试使用下面的代码将字段更改为正确的结构化json时

>>> new_df = sql_context.read.json(df.rdd.map(lambda r: r.json))
>>> new_df.printSchema()

我得到一个错误“raiseattributeerror(item)attributeerror:json”
请帮助soem代码提示。。。
我也可以使用groupby,但是在聚合函数中放什么比较困难,我需要结果中的dataframe来写入mongo。

5hcedyr0

5hcedyr01#

解决方案是在pyspark“df\u schema”中正确定义schema,然后将基本dfMap到新的df“df\u mongo”,确保df.rdd.map应该遵循df\u schema中定义的模式。

df =  spark.read.load("sourcelocation",format="csv", sep="|", inferSchema="true", header="true")
df_schema = StructType([StructField("field1", StringType(),True),StructField("field2", StringType(),True)])            
df_mongo = df.rdd.map(lambda row: ([row[15],row[12]])).toDF(df_schema)
df_mongo.write.format("mongo").mode("append").option("uri",mongodb_uri). \ 
option("database",dbname).option("collection", collection_name).save()

相关问题