用包含复杂数据类型的嵌套架构替换spark dataframe中的空值

sqougxex  于 2021-05-27  发布在  Spark
关注(0)|答案(3)|浏览(344)

我正在从json文件构建dataframe,模式在不断变化。下面是一个模式的示例。

root
|-- id: string (nullable = true)
|-- emprecords: array (nullable = true)
|   |-- element: struct (containsNull = true)
|       |-- emprec: array (nullable = true)
|           |-- element: struct (containsNull = true)
|               |-- firstName: string (nullable = true)
|               |-- lastName: string (nullable = true)
|               |-- email: string (nullable = true)
|               |-- salary: integer (nullable = true)

Dataframe示例:

ID     emprecords
 201    [[[Bruce, William,….
 202    Null
 203    [[[Mickey,Arthur……..
 204    Null

必须用空数组替换与ids 202和204相关联的行中存在的空值。这样做的原因是使用arrays\u zip函数将emprecords数组与另一个数组压缩,如果zip操作中包含的任何一个数组列为null,arrays\u zip函数将填充null。
我试过数组包含,转换函数捕捉和替换空值,但运气不好。

eagi6jfj

eagi6jfj1#

源Dataframe中的架构将更改。因此,在编写udf时,我不能显式地使用任何列名。因此,以下是实现这一点的方法:
val colname=“array column name”即emprecords在本例中val colindex=df.columns.indexof(colname)//其中df是从json val arrschema=df.schema(colindex.datatype)//读取Dataframe的数组列的架构创建的Dataframe
val emptyarrudf=udf(()=>seq.empty[any],arrschema)//创建空数组的udf
val df_updated=df.withcolumn(colname,when(col(colname).isnull,emptyarrudf())。否则(col(colname)))

11dmarpk

11dmarpk2#

筛选空值 emprecords 列添加(&A) array() 如果此列有空值,则保留原始数据。
更新

scala> df.show(false)
+------------------------------------------+---+
|emprecords                                |id |
+------------------------------------------+---+
|[[[[some email, Srinivas, Reddy, 10000]]]]|100|
|null                                      |101|
+------------------------------------------+---+

定义空json

scala> val emptyJson = """{"emprecords":[{"emprec":[{"firstName":"N/A","lastName":"N/A","email":"N/A","salary":0}]}]}"""

去拿那个 emprecords 来自当前Dataframe的列架构。

scala> val schema = df.select("emprecords").schema

使用 from_json 使用架构分析空记录。

scala> df
.withColumn("emprecords",
      when($"emprecords".isNull,from_json(lit(emptyJson),schema)("emprecords"))
     .otherwise($"emprecords")
)
.show(false)
+------------------------------------------+---+
|emprecords                                |id |
+------------------------------------------+---+
|[[[[some email, Srinivas, Reddy, 10000]]]]|100|
|[[[[N/A, N/A, N/A, 0]]]]                  |101|
+------------------------------------------+---+
yeotifhr

yeotifhr3#

也许这是有帮助的-

加载测试数据

val df = spark.sql(
      """
        | select id, emprecords
        |  from values
        | (201, array(named_struct('emprec', array(named_struct('firstName' , 'tom', 'lastName', 'hank'))))),
        | (202, null)
        | T(id, emprecords)
      """.stripMargin)
    df.printSchema()
    df.show(false)

    /**
      * root
      * |-- id: integer (nullable = false)
      * |-- emprecords: array (nullable = true)
      * |    |-- element: struct (containsNull = false)
      * |    |    |-- emprec: array (nullable = false)
      * |    |    |    |-- element: struct (containsNull = false)
      * |    |    |    |    |-- firstName: string (nullable = false)
      * |    |    |    |    |-- lastName: string (nullable = false)
      *
      * +---+-----------------+
      * |id |emprecords       |
      * +---+-----------------+
      * |201|[[[[tom, hank]]]]|
      * |202|null             |
      * +---+-----------------+
      */

替换空值

df.withColumn("emprecords",
      expr("ifnull(emprecords, array(named_struct('emprec', array(named_struct('firstName' , null, 'lastName', null)))))"))
      .show(false)

    /**
      * +---+-----------------+
      * |id |emprecords       |
      * +---+-----------------+
      * |201|[[[[tom, hank]]]]|
      * |202|[[[[,]]]]        |
      * +---+-----------------+
      */

相关问题