好的,情况是这样的。我在数据块中有一个更改数据馈送表增量表,它看起来像这样:
| 名字|姓氏(_N)|喜爱的颜色|食品|体育|年龄|时间戳|
| --|--|--|--|--|--|--|
| 约翰|Doe|蓝色|披萨|足球| 28 |2023-01-01 12:00:00|
| 约翰|史密斯|红色|汉堡|篮球| 35 |2023-01-01 12:00:00|
| 简|Doe|绿色|寿司|网球| 28 |2023-01-01 12:00:00|
| 简|Doe|绿色|寿司|网球| 29 |2023-01-02 12:00:00|
它在不断更新,但我希望有逻辑,以便我可以获取唯一标识符,在本例中是名字和姓氏,然后只有更改的列,然后将输出保存为以下格式:
| 名字|姓氏(_N)|有效载荷|
| --|--|--|
| 简|Doe| {年龄:29,时间:1/2/2023 12:00:00}|
因此在上面的例子中,因为只有Jane Doe的age和timestamp列发生了变化,所以我想将其包含到json payload中。
再举一个例子:
| 名字|姓氏(_N)|喜爱的颜色|食品|体育|年龄|时间戳|
| --|--|--|--|--|--|--|
| 约翰|Doe|蓝色|披萨|足球| 28 |2023-01-01 12:00:00|
| 约翰|史密斯|红色|汉堡|篮球| 35 |2023-01-01 12:00:00|
| 简|Doe|绿色|寿司|网球| 28 |2023-01-01 12:00:00|
| 简|Doe|绿色|寿司|网球| 29 |2023-01-02 12:00:00|
| 约翰|史密斯|红色|汉堡|游泳| 35 |2023-01-03 12:00:00|
| 名字|姓氏(_N)|有效载荷|
| --|--|--|
| 简|Doe| {年龄:29,时间:1/2/2023 12:00:00}|
| 约翰|史密斯|{体育:游泳,timestamp:1/3/2023 12:00:00}|
我在pyspark中做了所有这些,并且很难开始。
1条答案
按热度按时间wlsrxk511#
要在PySpark中实现这一点,您需要遵循以下步骤:
阅读数据:将增量表加载到DataFrame中。识别更改:比较每个唯一标识符(first_name,last_name)的连续行,以检测其他列中的更改。构造有效负载:对于每对发生更改的行,创建一个仅包含更改的列及其新值的JSON有效负载。聚合结果:按唯一标识符分组并聚合有效负载。以下是一个分步方法:
第一步:读取数据
假设你的delta表在Databricks中,你可以将它读入DataFrame。例如:
字符串
从delta表加载数据
型
第二步:识别更改
要查找更改,可以使用滞后窗口函数将每行与其前一行进行比较,并按唯一标识符分组。
型
定义窗口规范
型
添加以前的值进行比较
型
只选择发生变化的行
型
第三步:构造负载
现在,为df_changes中的每一行创建一个JSON有效负载,其中只包含更改的列。
型
使用payload添加新列
型
第4步:聚合结果最后,选择所需的列,并在必要时进行聚合。
型
写输出
现在,您可以根据需要将result_df写回增量表或任何其他存储。
型
请记住,这是一个基本的模板。根据您的特定需求,例如处理大型数据集,确保效率或处理更复杂的更改,您可能需要调整逻辑。此外,请确保您的Spark环境已正确配置为使用Delta表。