如何创建一个新的pyspark对象框架,只捕获已更改为payload的列?

af7jpaap  于 5个月前  发布在  Spark
关注(0)|答案(1)|浏览(47)

好的,情况是这样的。我在数据块中有一个更改数据馈送表增量表,它看起来像这样:
| 名字|姓氏(_N)|喜爱的颜色|食品|体育|年龄|时间戳|
| --|--|--|--|--|--|--|
| 约翰|Doe|蓝色|披萨|足球| 28 |2023-01-01 12:00:00|
| 约翰|史密斯|红色|汉堡|篮球| 35 |2023-01-01 12:00:00|
| 简|Doe|绿色|寿司|网球| 28 |2023-01-01 12:00:00|
| 简|Doe|绿色|寿司|网球| 29 |2023-01-02 12:00:00|
它在不断更新,但我希望有逻辑,以便我可以获取唯一标识符,在本例中是名字和姓氏,然后只有更改的列,然后将输出保存为以下格式:
| 名字|姓氏(_N)|有效载荷|
| --|--|--|
| 简|Doe| {年龄:29,时间:1/2/2023 12:00:00}|
因此在上面的例子中,因为只有Jane Doe的age和timestamp列发生了变化,所以我想将其包含到json payload中。
再举一个例子:
| 名字|姓氏(_N)|喜爱的颜色|食品|体育|年龄|时间戳|
| --|--|--|--|--|--|--|
| 约翰|Doe|蓝色|披萨|足球| 28 |2023-01-01 12:00:00|
| 约翰|史密斯|红色|汉堡|篮球| 35 |2023-01-01 12:00:00|
| 简|Doe|绿色|寿司|网球| 28 |2023-01-01 12:00:00|
| 简|Doe|绿色|寿司|网球| 29 |2023-01-02 12:00:00|
| 约翰|史密斯|红色|汉堡|游泳| 35 |2023-01-03 12:00:00|
| 名字|姓氏(_N)|有效载荷|
| --|--|--|
| 简|Doe| {年龄:29,时间:1/2/2023 12:00:00}|
| 约翰|史密斯|{体育:游泳,timestamp:1/3/2023 12:00:00}|
我在pyspark中做了所有这些,并且很难开始。

wlsrxk51

wlsrxk511#

要在PySpark中实现这一点,您需要遵循以下步骤:
阅读数据:将增量表加载到DataFrame中。识别更改:比较每个唯一标识符(first_name,last_name)的连续行,以检测其他列中的更改。构造有效负载:对于每对发生更改的行,创建一个仅包含更改的列及其新值的JSON有效负载。聚合结果:按唯一标识符分组并聚合有效负载。以下是一个分步方法:

第一步:读取数据

假设你的delta表在Databricks中,你可以将它读入DataFrame。例如:

from pyspark.sql import SparkSession
from pyspark.sql.functions import *

## Initialize Spark Session
spark = SparkSession.builder.appName("Change Data Feed").getOrCreate()

字符串

从delta表加载数据

df = spark.read.format("delta").load("/path/to/delta/table")

第二步:识别更改

要查找更改,可以使用滞后窗口函数将每行与其前一行进行比较,并按唯一标识符分组。

from pyspark.sql.window import Window

定义窗口规范

windowSpec = Window.partitionBy("first_name", "last_name").orderBy("timestamp")

添加以前的值进行比较

df_with_prev = df.withColumn("prev_values", lag(struct([col for col in df.columns if col not in ["first_name", "last_name", "timestamp"]])).over(windowSpec))

只选择发生变化的行

df_changes = df_with_prev.filter("prev_values IS NOT NULL AND (prev_values != struct(age, favorite_color, food, sport))")

第三步:构造负载

现在,为df_changes中的每一行创建一个JSON有效负载,其中只包含更改的列。

def create_payload(row):
    payload = {}
    for col in ["age", "favorite_color", "food", "sport"]:
        if row[col] != row["prev_values"][col]:
            payload[col] = row[col]
    return payload

使用payload添加新列

df_payload = df_changes.withColumn("payload", udf(create_payload, MapType(StringType(), StringType()))(struct([df[col] for col in df.columns])))


第4步:聚合结果最后,选择所需的列,并在必要时进行聚合。

result_df = df_payload.select("first_name", "last_name", "payload").distinct()

写输出

现在,您可以根据需要将result_df写回增量表或任何其他存储。

result_df.write.format("delta").save("/path/to/output/delta/table")


请记住,这是一个基本的模板。根据您的特定需求,例如处理大型数据集,确保效率或处理更复杂的更改,您可能需要调整逻辑。此外,请确保您的Spark环境已正确配置为使用Delta表。

相关问题