我正在将json转换成Dataframe。在第一步中,我创建一个Dataframe数组,然后创建一个并集。但是我有一个问题,在json中使用不同的模式进行联合。
如果json具有与您在另一个问题中看到的相同的模式,我可以这样做:使用sparkscala在列中解析json根
我正在处理以下数据:
val exampleJsonDifferentSchema = spark.createDataset(
"""
{"ITEM1512":
{"name":"Yin",
"address":{"city":"Columbus",
"state":"Ohio"},
"age":28 },
"ITEM1518":
{"name":"Yang",
"address":{"city":"Working",
"state":"Marc"}
},
"ITEM1458":
{"name":"Yossup",
"address":{"city":"Macoss",
"state":"Microsoft"},
"age":28
}
}""" :: Nil)
正如您所看到的,不同之处在于一个Dataframe没有年龄。
val itemsExampleDiff = spark.read.json(exampleJsonDifferentSchema)
itemsExampleDiff.show(false)
itemsExampleDiff.printSchema
+---------------------------------+---------------------------+-----------------------+
|ITEM1458 |ITEM1512 |ITEM1518 |
+---------------------------------+---------------------------+-----------------------+
|[[Macoss, Microsoft], 28, Yossup]|[[Columbus, Ohio], 28, Yin]|[[Working, Marc], Yang]|
+---------------------------------+---------------------------+-----------------------+
root
|-- ITEM1458: struct (nullable = true)
| |-- address: struct (nullable = true)
| | |-- city: string (nullable = true)
| | |-- state: string (nullable = true)
| |-- age: long (nullable = true)
| |-- name: string (nullable = true)
|-- ITEM1512: struct (nullable = true)
| |-- address: struct (nullable = true)
| | |-- city: string (nullable = true)
| | |-- state: string (nullable = true)
| |-- age: long (nullable = true)
| |-- name: string (nullable = true)
|-- ITEM1518: struct (nullable = true)
| |-- address: struct (nullable = true)
| | |-- city: string (nullable = true)
| | |-- state: string (nullable = true)
| |-- name: string (nullable = true)
我现在的解决方案如下所示,我在其中创建了一个Dataframe数组:
val columns:Array[String] = itemsExample.columns
var arrayOfExampleDFs:Array[DataFrame] = Array()
for(col_name <- columns){
val temp = itemsExample.select(lit(col_name).as("Item"), col(col_name).as("Value"))
arrayOfExampleDFs = arrayOfExampleDFs :+ temp
}
val jsonDF = arrayOfExampleDFs.reduce(_ union _)
但是我有一个json,它有不同的模式,当我在一个联合中减少时,我不能这样做,因为Dataframe需要有相同的模式。事实上,我有以下错误:
org.apache.spark.sql.analysisexception:只能对具有兼容列类型的表执行联合。
在这个问题上,我尝试做一些类似的事情:如何在spark中对两个列数不同的Dataframe执行并集?
特别是这一部分:
val cols1 = df1.columns.toSet
val cols2 = df2.columns.toSet
val total = cols1 ++ cols2 // union
def expr(myCols: Set[String], allCols: Set[String]) = {
allCols.toList.map(x => x match {
case x if myCols.contains(x) => col(x)
case _ => lit(null).as(x)
})
}
但是我不能为列设置,因为我需要动态地捕获列的总计和单列。我只能这样做:
for(i <- 0 until arrayOfExampleDFs.length-1) {
val cols1 = arrayOfExampleDFs(i).select("Value").columns.toSet
val cols2 = arrayOfExampleDFs(i+1).select("Value").columns.toSet
val total = cols1 ++ cols2
arrayOfExampleDFs(i).select("Value").printSchema()
print(total)
}
那么,一个函数怎么能动态地实现这个联合呢?
更新:预期输出
在这种情况下,此Dataframe和模式:
+--------+---------------------------------+
|Item |Value |
+--------+---------------------------------+
|ITEM1458|[[Macoss, Microsoft], 28, Yossup]|
|ITEM1512|[[Columbus, Ohio], 28, Yin] |
|ITEM1518|[[Working, Marc], null, Yang] |
+--------+---------------------------------+
root
|-- Item: string (nullable = false)
|-- Value: struct (nullable = true)
| |-- address: struct (nullable = true)
| | |-- city: string (nullable = true)
| | |-- state: string (nullable = true)
| |-- age: long (nullable = true)
| |-- name: string (nullable = true)
1条答案
按热度按时间idfiyjo81#
下面是一个可能的解决方案,它通过在找不到Dataframe时添加age列来为所有Dataframe创建一个公共模式:
分析:可能最困难的部分是找出
age
是否存在。我们使用的查找df.schema.fields
属性,允许我们深入研究每列的内部模式。当未找到age时,我们使用
struct
: