使用不同模式合并Dataframe-scala spark

oewdyzsn  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(371)

我正在将json转换成Dataframe。在第一步中,我创建一个Dataframe数组,然后创建一个并集。但是我有一个问题,在json中使用不同的模式进行联合。
如果json具有与您在另一个问题中看到的相同的模式,我可以这样做:使用sparkscala在列中解析json根
我正在处理以下数据:

val exampleJsonDifferentSchema = spark.createDataset(

      """
      {"ITEM1512":
            {"name":"Yin",
             "address":{"city":"Columbus",
                        "state":"Ohio"},
             "age":28           }, 
        "ITEM1518":
            {"name":"Yang",
             "address":{"city":"Working",
                        "state":"Marc"}
                        },
        "ITEM1458":
            {"name":"Yossup",
             "address":{"city":"Macoss",
                        "state":"Microsoft"},
            "age":28
                        }
      }""" :: Nil)

正如您所看到的,不同之处在于一个Dataframe没有年龄。

val itemsExampleDiff = spark.read.json(exampleJsonDifferentSchema)
itemsExampleDiff.show(false)
itemsExampleDiff.printSchema

+---------------------------------+---------------------------+-----------------------+
|ITEM1458                         |ITEM1512                   |ITEM1518               |
+---------------------------------+---------------------------+-----------------------+
|[[Macoss, Microsoft], 28, Yossup]|[[Columbus, Ohio], 28, Yin]|[[Working, Marc], Yang]|
+---------------------------------+---------------------------+-----------------------+

root
 |-- ITEM1458: struct (nullable = true)
 |    |-- address: struct (nullable = true)
 |    |    |-- city: string (nullable = true)
 |    |    |-- state: string (nullable = true)
 |    |-- age: long (nullable = true)
 |    |-- name: string (nullable = true)
 |-- ITEM1512: struct (nullable = true)
 |    |-- address: struct (nullable = true)
 |    |    |-- city: string (nullable = true)
 |    |    |-- state: string (nullable = true)
 |    |-- age: long (nullable = true)
 |    |-- name: string (nullable = true)
 |-- ITEM1518: struct (nullable = true)
 |    |-- address: struct (nullable = true)
 |    |    |-- city: string (nullable = true)
 |    |    |-- state: string (nullable = true)
 |    |-- name: string (nullable = true)

我现在的解决方案如下所示,我在其中创建了一个Dataframe数组:

val columns:Array[String]       = itemsExample.columns
var arrayOfExampleDFs:Array[DataFrame] = Array()

for(col_name <- columns){

  val temp = itemsExample.select(lit(col_name).as("Item"), col(col_name).as("Value"))

  arrayOfExampleDFs = arrayOfExampleDFs :+ temp
}

val jsonDF = arrayOfExampleDFs.reduce(_ union _)

但是我有一个json,它有不同的模式,当我在一个联合中减少时,我不能这样做,因为Dataframe需要有相同的模式。事实上,我有以下错误:
org.apache.spark.sql.analysisexception:只能对具有兼容列类型的表执行联合。
在这个问题上,我尝试做一些类似的事情:如何在spark中对两个列数不同的Dataframe执行并集?
特别是这一部分:

val cols1 = df1.columns.toSet
val cols2 = df2.columns.toSet
val total = cols1 ++ cols2 // union

def expr(myCols: Set[String], allCols: Set[String]) = {
  allCols.toList.map(x => x match {
    case x if myCols.contains(x) => col(x)
    case _ => lit(null).as(x)
  })
}

但是我不能为列设置,因为我需要动态地捕获列的总计和单列。我只能这样做:

for(i <- 0 until arrayOfExampleDFs.length-1) {

    val cols1 = arrayOfExampleDFs(i).select("Value").columns.toSet
    val cols2 = arrayOfExampleDFs(i+1).select("Value").columns.toSet
    val total = cols1 ++ cols2

    arrayOfExampleDFs(i).select("Value").printSchema()

    print(total)
}

那么,一个函数怎么能动态地实现这个联合呢?
更新:预期输出
在这种情况下,此Dataframe和模式:

+--------+---------------------------------+
|Item    |Value                            |
+--------+---------------------------------+
|ITEM1458|[[Macoss, Microsoft], 28, Yossup]|
|ITEM1512|[[Columbus, Ohio], 28, Yin]      |
|ITEM1518|[[Working, Marc], null, Yang]    |
+--------+---------------------------------+

root
 |-- Item: string (nullable = false)
 |-- Value: struct (nullable = true)
 |    |-- address: struct (nullable = true)
 |    |    |-- city: string (nullable = true)
 |    |    |-- state: string (nullable = true)
 |    |-- age: long (nullable = true)
 |    |-- name: string (nullable = true)
idfiyjo8

idfiyjo81#

下面是一个可能的解决方案,它通过在找不到Dataframe时添加age列来为所有Dataframe创建一个公共模式:

import org.apache.spark.sql.functions.{col, lit, struct}
import org.apache.spark.sql.types.{LongType, StructField, StructType}

....

for(col_name <- columns){
  val currentDf = itemsExampleDiff.select(col(col_name))

  // try to identify if age field is present
  val hasAge = currentDf.schema.fields(0)
                        .dataType
                        .asInstanceOf[StructType]
                        .fields
                        .contains(StructField("age", LongType, true))

  val valueCol = hasAge match {
    // if not construct a new value column
    case false => struct(
                    col(s"${col_name}.address"), 
                    lit(null).cast("bigint").as("age"),
                    col(s"${col_name}.name")
                  )

    case true => col(col_name)
  }

  arrayOfExampleDFs = arrayOfExampleDFs :+ currentDf.select(lit(col_name).as("Item"), valueCol.as("Value"))
}

val jsonDF = arrayOfExampleDFs.reduce(_ union _)

// +--------+---------------------------------+
// |Item    |Value                            |
// +--------+---------------------------------+
// |ITEM1458|[[Macoss, Microsoft], 28, Yossup]|
// |ITEM1512|[[Columbus, Ohio], 28, Yin]      |
// |ITEM1518|[[Working, Marc],, Yang]         |
// +--------+---------------------------------+

分析:可能最困难的部分是找出 age 是否存在。我们使用的查找 df.schema.fields 属性,允许我们深入研究每列的内部模式。
当未找到age时,我们使用 struct :

struct(
   col(s"${col_name}.address"), 
   lit(null).cast("bigint").as("age"),
   col(s"${col_name}.name")
)

相关问题