spark scala基于规则从数组列派生列

j5fpnvbx  于 2021-05-27  发布在  Spark
关注(0)|答案(2)|浏览(366)

我对spark和scala是新手。我有一个json数组结构作为输入,类似于下面的模式。

root
|-- entity: struct (nullable = true)
|    |-- email: string (nullable = true)
|    |-- primaryAddresses: array (nullable = true)
|    |    |-- element: struct (containsNull = true)
|    |    |    |-- postalCode: string (nullable = true)
|    |    |    |-- streetAddress: struct (nullable = true)
|    |    |    |    |-- line1: string (nullable = true)

我将数组结构展平到下面的示例Dataframe

+-------------+--------------------------------------+--------------------------------------+
|entity.email |entity.primaryAddresses[0].postalCode |entity.primaryAddresses[1].postalCode |....
+-------------+--------------------------------------+--------------------------------------+
|a@b.com      |                                      |                                      |
|a@b.com      |                                      |12345                                 |
|a@b.com      |12345                                 |                                      |
|a@b.com      |0                                     |0                                     |
+-------------+--------------------------------------+--------------------------------------+

我的最终目标是为数据质量度量的每个列计算存在/不存在/零计数。但在计算数据质量度量之前,我正在寻找一种方法,为每个数组列元素派生一个新列,如下所示:
如果特定数组元素的所有值都为空,则该元素的派生列为空
如果数组元素至少存在一个值,则该元素的存在被视为1
如果数组元素的所有值都为零,则我将该元素标记为零(稍后计算数据质量时,我将其校准为presence=1和zero=1)
下面是一个示例中间Dataframe,我试图通过为每个数组元素派生的列来实现它。原始数组元素将被删除。

+-------------+--------------------------------------+
|entity.email |entity.primaryAddresses.postalCode    |.....
+-------------+--------------------------------------+
|a@b.com      |                                      |
|a@b.com      |1                                     |
|a@b.com      |1                                     |
|a@b.com      |0                                     |
+-------------+--------------------------------------+

输入json记录元素是动态的,可以更改。为了派生数组元素的列,我构建了一个scalaMap,其中有一个键作为列名,没有数组索引(example:entity.primaryaddresses.postalcode)值作为要对特定键运行规则的数组元素的列表。我正在寻找一种方法来实现上述中间Dataframe。
一个问题是,对于某些输入文件,在我展平dataframe之后,dataframe列计数超过70k+。由于记录数预计将以百万计,我想知道是否应该分解每个元素以获得更好的性能,而不是将json扁平化。
谢谢你的建议。谢谢您。

zujrkrfu

zujrkrfu1#

您可以利用自定义的用户定义函数来帮助您完成数据质量度量。

val postalUdf = udf((postalCode0: Int, postalCode1: Int) => {
        //TODO implement you logic here
    })

然后使用is创建一个新的dataframe列

df
  .withColumn("postcalCode", postalUdf(col("postalCode_0"), col("postalCode_1")))
  .show()
e0bqpujr

e0bqpujr2#

创建了helper函数&您可以直接调用 df.explodeColumns 在Dataframe上。下面的代码将展平多级数组和结构类型列。
使用below函数提取列,然后对其应用转换。

scala> df.printSchema
root
 |-- entity: struct (nullable = false)
 |    |-- email: string (nullable = false)
 |    |-- primaryAddresses: array (nullable = false)
 |    |    |-- element: struct (containsNull = false)
 |    |    |    |-- postalCode: string (nullable = false)
 |    |    |    |-- streetAddress: struct (nullable = false)
 |    |    |    |    |-- line1: string (nullable = false)
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import scala.annotation.tailrec
import scala.util.Try

implicit class DFHelpers(df: DataFrame) {
    def columns = {
      val dfColumns = df.columns.map(_.toLowerCase)
      df.schema.fields.flatMap { data =>
        data match {
          case column if column.dataType.isInstanceOf[StructType] => {
            column.dataType.asInstanceOf[StructType].fields.map { field =>
              val columnName = column.name
              val fieldName = field.name
              col(s"${columnName}.${fieldName}").as(s"${columnName}_${fieldName}")
            }.toList
          }
          case column => List(col(s"${column.name}"))
        }
      }
    }

    def flatten: DataFrame = {
      val empty = df.schema.filter(_.dataType.isInstanceOf[StructType]).isEmpty
      empty match {
        case false =>
          df.select(columns: _*).flatten
        case _ => df
      }
    }
    def explodeColumns = {
      @tailrec
      def columns(cdf: DataFrame):DataFrame = cdf.schema.fields.filter(_.dataType.typeName == "array") match {
        case c if !c.isEmpty => columns(c.foldLeft(cdf)((dfa,field) => {
          dfa.withColumn(field.name,explode_outer(col(s"${field.name}"))).flatten
        }))
        case _ => cdf
      }
      columns(df.flatten)
    }
}
scala> df.explodeColumns.printSchema
root
 |-- entity_email: string (nullable = false)
 |-- entity_primaryAddresses_postalCode: string (nullable = true)
 |-- entity_primaryAddresses_streetAddress_line1: string (nullable = true)

相关问题