我试图解析json结构,它本质上是动态的,并加载到数据库中。但面临着json内部有动态键的困难。下面是我的json示例:我尝试过使用explode函数,但没有帮助。这里描述了大多数类似的事情:如何在嵌套的json结果中解析动态json键?
{
"_id": {
"planId": "5f34dab0c661d8337097afb9",
"version": {
"$numberLong": "1"
},
"period": {
"name"
: "3Q20",
"startDate": 20200629,
"endDate": 20200927
},
"line": "b443e9c0-fafc-4791-87c9-
8e32339c7f3c",
"channelId": "G7k5_-HWRIuF0-afe7q-rQ"
},
"unitRates": {
"units": {
"$numberLong":
"0"
},
"rate": 0.0,
"rcRate": 0.0
},
"demoValues": {
"66": {
"cpm": 0.0,
"cpp": 0,
"vpvh": 0.0,
"imps"
:
0.0,
"rcImps": 0.0,
"ue": 0.0,
"grps": 0.0,
"demoId": "66"
},
"63": {
"cpm": 0.0,
"cpp": 0,
"vpvh":
0.0,
"imps": 0.0,
"rcImps": 0.0,
"ue": 0.0,
"grps": 0.0,
"demoId": "63"
},
"21": {
"cpm": 0.0,
"cpp"
:
0,
"vpvh": 0.0,
"imps": 0.0,
"rcImps": 0.0,
"ue": 0.0,
"grps": 0.0,
"demoId": "21"
}
},
"hh-imps":
0.0
}
下面是我的scala代码:
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import com.google.gson.JsonObject
import org.apache.spark.sql.types.{ArrayType, MapType, StringType,
StructField, StructType}
import org.codehaus.jettison.json.JSONObject
object ParseDynamic_v2 {
def main(args: Array[String]): Unit = {
System.setProperty("hadoop.home.dir", "C:\\hadoop")
val spark = SparkSession
.builder
.appName("ConfluentConsumer")
.master("local[4]")
.getOrCreate()
import spark.implicits._
val jsonStringDs = spark.createDataset[String](
Seq(
("""{"_id" : {"planId" : "5f34dab0c661d8337097afb9","version" : {"$numberLong" : "1"},"period" : {"name" : "3Q20","startDate" : 20200629,"endDate" : 20200927},"line" : "b443e9c0-fafc-4791-87c9-8e32339c7f3c","channelId" : "G7k5_-HWRIuF0-afe7q-rQ"},"unitRates" : {"units" : {"$numberLong" : "0"},"rate" : 0.0,"rcRate" : 0.0},"demoValues" : {"66" : {"cpm" : 0.0,"cpp" : 0,"vpvh" : 0.0,"imps" : 0.0,"rcImps" : 0.0,"ue" : 0.0,"grps" : 0.0,"demoId" : "66"},"63" : {"cpm" : 0.0,"cpp" : 0,"vpvh" : 0.0,"imps" : 0.0,"rcImps" : 0.0,"ue" : 0.0,"grps" : 0.0,"demoId" : "63"},"21" : {"cpm" : 0.0,"cpp" : 0,"vpvh" : 0.0,"imps" : 0.0,"rcImps" : 0.0,"ue" : 0.0,"grps" : 0.0,"demoId" : "21"}},"hh-imps" : 0.0}""")
))
jsonStringDs.show
import spark.implicits._
val df = spark.read.json(jsonStringDs)
df.show(false)
val app = df.select("demoValues.*")
app.createOrReplaceTempView("app")
app.printSchema
app.show(false)
val verticaProperties: Map[String, String] = Map(
"db" -> "dbname", // Database name
"user" -> "user", // Database username
"password" -> "****", // Password
"table" -> "tablename", // vertica table name
"dbschema" -> "public", // schema of vertica where the table will be
residing
"host" -> "localhost", // Host on which vertica is currently running
"hdfs_url" -> "hdfs://localhost:8020/user/hadoop/planheader/", // HDFS directory url in which intermediate orc file will persist before sending it to vertica
"web_hdfs_url" -> "webhdfs://localhost:50070/user/hadoop/planheader/"
)
val verticaDataSource = "com.vertica.spark.datasource.DefaultSource"
//read mode
val loadStream = df.write.format(verticaDataSource).options(verticaProperties).mode("overwrite").save()
//read stream mode
val saveToVertica: DataFrame => Unit =
dataFrame =>
dataFrame.write.format(verticaDataSource).options(verticaProperties).mode("append").save()
val checkpointLocation = "/user/hadoop/planheader/checkpoint"
val streamingQuery = df.writeStream
.outputMode(OutputMode.Append)
.option("checkpointLocation", checkpointLocation)
//.trigger(ProcessingTime("25 seconds"))
.foreachBatch((ds, _) => saveToVertica(ds)).start()
streamingQuery.awaitTermination()
}
}
预期产量:
2条答案
按热度按时间rpppsulh1#
我不知道我的代码有多有效,但它确实有效。
dbf7pr2w2#
下面是我使用vertica所做的:
我创建了一个flex表,加载了它,并使用了vertica的flex表函数
COMPUTE_FLEXTABLE_KEYS_AND_CREATE_VIEW()
去看风景。原来是一个单行表:
例如,对于孩子们: