pyspark 如何在结构化流中实现窗口函数?

p4tfgftt  于 5个月前  发布在  Spark
关注(0)|答案(1)|浏览(52)

首先是一些背景,我最近不得不将我的管道从标准的阅读和写移动到结构化流,因为一些业务需求需要将我们所有的管道合并在一个中心位置。我所做的操作是,从source容器中读取数据,对数据进行一些转换,将其保存为staging中的表,然后在一些其他转换后将数据移动到analytics层。
我使用代码:

df = spark.read.parquet(path)
transformed_df = do_transformations(df, primary_keys, transformation_name)
saveAsTable_tostaging() //

字符串
我更新了上面的代码:

df = {spark.readStream.format.options.schema.load //}
transformed_df = do_transformations(df, primary_keys, transformation_name)
streaming_query = { transformed_df.writeStream.trigger ///}

Pseudo code for read and write streams. Note, the readStream() and writeStream() operations
are part of the central place and I cannot change them as other teams are also using it. 
I can only update my transformation() function. However, I am allowed to update the trigger details of the writeStream, which I changed to "once":True


对于我的用例,管道每天在4AM上运行一次。对于我的用例,没有实际的流作业,因为我们每天只获取一次数据,这就是为什么它仍然被视为批处理作业,运行一次并停止。在更改之后,几乎所有的转换仍然正常工作,整个读/写流程也正常工作,但有一个转换一直失败。
改造:在df中有一些特定的字段,让我们称它们为'additive-fields',对于所有keys,它们必须是summed。首先,我需要检查staging中的table already exists是否存在。如果存在,然后我需要读取staging表,并在相同字段上的staging tabledf的并集上执行sum操作,如果否,然后我只需要在df上执行sum操作,然后每个键的单个记录应该被发送到staging,其中包含这些'additive-fields'的总和
例如,设df =

|Key1| Key2| Nm1 | Nm2 | AdditiveField1| AdditiveField2| Db_timestamp| source_ts|
|----|-----|-----|-----|---------------|---------------|-------------|----------|
|A   |B    | Car | Bike| 10            | -0.02         |20231201     |20231127  |
|A   |B    |Taxi |Cycle|4              |-0.06          |20231201     |20231128  |
|B   |C    |Xyz  |ABC  |1              |-20            |20231201     |20231128  |


Staging_table =

|Key1|Key2|Nm1|Nm2|AdditiveField1|AdditiveField2|Db_timestamp|source_ts|
|--- |----|---|---|--------------|--------------|------------|---------|
| B  | C  |Old|PQR|       7      |       1      |  20231120  | 20231101|


Here, Key1, Key2 are the keys, NM* are the normal fields on which I just have to take the latest record sorted by source_ts, and the additive fields need to be summed.
所需输出为:

|Key1|Key2|Nm1 | Nm2 |AdditiveField1|AdditiveField2|Db_timestamp|source_ts|
|----|----|----|-----|--------------|--------------|------------|---------|
| A  | B  |Taxi|Cycle|      14      |   -0.08      | 20231201   | 20231128|
| B  | C  |Xyz | ABC |       8      |    -19       | 20231201   | 20231128|

编辑:抱歉,无法修复问题中的表结构,df、staging_df和输出如下https://prnt.sc/l3OuqsEWgGX-

当我使用标准读写时,我可以很容易地实现这一点:

def implement_additive_fields(df, table_name, primary_keys: list, fields: list):

    if check_tableExists(table_name): # returns true if table exists in staging else false.
        stmt = f"select * from {table_name}"
        staging_df = spark.sql(stmt)
        df = df.union(staging_df)

    window = Window.partitionBy(primary_keys).orderBy('source_ts').rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)

    For i in fields: #these are the additive fields in the df
        df = df.withColumn(i, F.sum(i).over(window))

    df = df.withColumn('rank', F.row_number().over(window)).filter(F.col('rank')==1).drop(F.col('rank'))

   return df


在实现结构化流媒体之后,我明白我不能再在我的df上使用这些window functions,我需要使用structured streaming可用的window functions,其中有一些时间参数,但我无法理解如何实现它们,因为我仍然作为batch运行此作业(每天一次),然后它关闭。这些表没有实际的流作业,我想对来自staging表的记录以及流df进行sum

vshtjzan

vshtjzan1#

您可以使用foreachBatch来调用函数并执行转换。
像这样修改你的函数:

def implement_additive_fields(df, table_name, primary_keys: list, fields: list):

    if check_tableExists(table_name):
        stmt = f"select * from {table_name}"
        staging_df = spark.sql(stmt)
        df = df.union(staging_df)

    window1 = Window.partitionBy(primary_keys).orderBy(F.desc('source_ts'))
    window2 = Window.partitionBy(primary_keys).orderBy('source_ts')

    for i in fields:
        df = df.withColumn(i, F.sum(i).over(window2))

    df = df.withColumn('rank', F.row_number().over(window1)).filter(F.col('rank')==1).drop(F.col('rank'))
    df.write.saveAsTable("tmp_staged")
    print("Done")

字符串
把它叫做:

primary_keys=['Key1','Key2']
fields=['AdditiveField1','AdditiveField2']
stream_df = spark.readStream.schema(schema=schema).format("csv").option("header","true").load("/csvs/")
stream_df.writeStream.option("checkpointlocation","tmpcheckpointloc").foreachBatch(lambda df,id: implement_additive_fields(df,table_name,primary_keys=primary_keys,fields=fields)).trigger(once=True).start().awaitTermination()


以下是使用的输入数据:
| Key1| Key2| NM1| NM2|添加剂字段1| AdditiveField2| DB_timestamp|源ts|
| --|--|--|--|--|--|--|--|
| 一|B|车|自行车| 10 | -0.02 | 20231201 | 20231127 |
| 一|B|出租车|周期| 4 | -0.06 | 20231201 | 20231128 |
| B| C| Xyz| ABC| 1 |-2.0| 2023431201 | 20231128 |
输出量:
x1c 0d1x的数据
| Key1| Key2| NM1| NM2|添加剂字段1| AdditiveField2| DB_timestamp|源ts|
| --|--|--|--|--|--|--|--|
| 一|B|出租车|周期| 14 | -0.08 | 20231201 | 20231128 |
| B| C| Xyz| ABC| 8 |-1个| 2023431201 | 20231128 |

相关问题