我对spark和scala是新手。我有一个json数组结构作为输入,类似于下面的模式。
root
|-- entity: struct (nullable = true)
| |-- email: string (nullable = true)
| |-- primaryAddresses: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- postalCode: string (nullable = true)
| | | |-- streetAddress: struct (nullable = true)
| | | | |-- line1: string (nullable = true)
我将数组结构展平到下面的示例Dataframe
+-------------+--------------------------------------+--------------------------------------+
|entity.email |entity.primaryAddresses[0].postalCode |entity.primaryAddresses[1].postalCode |....
+-------------+--------------------------------------+--------------------------------------+
|a@b.com | | |
|a@b.com | |12345 |
|a@b.com |12345 | |
|a@b.com |0 |0 |
+-------------+--------------------------------------+--------------------------------------+
我的最终目标是为数据质量度量的每个列计算存在/不存在/零计数。但在计算数据质量度量之前,我正在寻找一种方法,为每个数组列元素派生一个新列,如下所示:
如果特定数组元素的所有值都为空,则该元素的派生列为空
如果数组元素至少存在一个值,则该元素的存在被视为1
如果数组元素的所有值都为零,则我将该元素标记为零(稍后计算数据质量时,我将其校准为presence=1和zero=1)
下面是一个示例中间Dataframe,我试图通过为每个数组元素派生的列来实现它。原始数组元素将被删除。
+-------------+--------------------------------------+
|entity.email |entity.primaryAddresses.postalCode |.....
+-------------+--------------------------------------+
|a@b.com | |
|a@b.com |1 |
|a@b.com |1 |
|a@b.com |0 |
+-------------+--------------------------------------+
输入json记录元素是动态的,可以更改。为了派生数组元素的列,我构建了一个scalaMap,其中有一个键作为列名,没有数组索引(example:entity.primaryaddresses.postalcode)值作为要对特定键运行规则的数组元素的列表。我正在寻找一种方法来实现上述中间Dataframe。
一个问题是,对于某些输入文件,在我展平dataframe之后,dataframe列计数超过70k+。由于记录数预计将以百万计,我想知道是否应该分解每个元素以获得更好的性能,而不是将json扁平化。
谢谢你的建议。谢谢您。
2条答案
按热度按时间zujrkrfu1#
您可以利用自定义的用户定义函数来帮助您完成数据质量度量。
然后使用is创建一个新的dataframe列
e0bqpujr2#
创建了helper函数&您可以直接调用
df.explodeColumns
在Dataframe上。下面的代码将展平多级数组和结构类型列。使用below函数提取列,然后对其应用转换。