首先是一些背景,我最近不得不将我的管道从标准的阅读和写移动到结构化流,因为一些业务需求需要将我们所有的管道合并在一个中心位置。我所做的操作是,从source
容器中读取数据,对数据进行一些转换,将其保存为staging
中的表,然后在一些其他转换后将数据移动到analytics
层。
我使用代码:
df = spark.read.parquet(path)
transformed_df = do_transformations(df, primary_keys, transformation_name)
saveAsTable_tostaging() //
字符串
我更新了上面的代码:
df = {spark.readStream.format.options.schema.load //}
transformed_df = do_transformations(df, primary_keys, transformation_name)
streaming_query = { transformed_df.writeStream.trigger ///}
Pseudo code for read and write streams. Note, the readStream() and writeStream() operations
are part of the central place and I cannot change them as other teams are also using it.
I can only update my transformation() function. However, I am allowed to update the trigger details of the writeStream, which I changed to "once":True
型
对于我的用例,管道每天在4AM
上运行一次。对于我的用例,没有实际的流作业,因为我们每天只获取一次数据,这就是为什么它仍然被视为批处理作业,运行一次并停止。在更改之后,几乎所有的转换仍然正常工作,整个读/写流程也正常工作,但有一个转换一直失败。
改造:在df
中有一些特定的字段,让我们称它们为'additive-fields'
,对于所有keys
,它们必须是summed
。首先,我需要检查staging
中的table already exists
是否存在。如果存在,然后我需要读取staging
表,并在相同字段上的staging table
和df
的并集上执行sum
操作,如果否,然后我只需要在df
上执行sum
操作,然后每个键的单个记录应该被发送到staging
,其中包含这些'additive-fields'
的总和
例如,设df
=
|Key1| Key2| Nm1 | Nm2 | AdditiveField1| AdditiveField2| Db_timestamp| source_ts|
|----|-----|-----|-----|---------------|---------------|-------------|----------|
|A |B | Car | Bike| 10 | -0.02 |20231201 |20231127 |
|A |B |Taxi |Cycle|4 |-0.06 |20231201 |20231128 |
|B |C |Xyz |ABC |1 |-20 |20231201 |20231128 |
型Staging_table
=
|Key1|Key2|Nm1|Nm2|AdditiveField1|AdditiveField2|Db_timestamp|source_ts|
|--- |----|---|---|--------------|--------------|------------|---------|
| B | C |Old|PQR| 7 | 1 | 20231120 | 20231101|
型Here, Key1, Key2 are the keys, NM* are the normal fields on which I just have to take the latest record sorted by source_ts, and the additive fields need to be summed.
个
所需输出为:
|Key1|Key2|Nm1 | Nm2 |AdditiveField1|AdditiveField2|Db_timestamp|source_ts|
|----|----|----|-----|--------------|--------------|------------|---------|
| A | B |Taxi|Cycle| 14 | -0.08 | 20231201 | 20231128|
| B | C |Xyz | ABC | 8 | -19 | 20231201 | 20231128|
型
编辑:抱歉,无法修复问题中的表结构,df、staging_df和输出如下https://prnt.sc/l3OuqsEWgGX-
当我使用标准读写时,我可以很容易地实现这一点:
def implement_additive_fields(df, table_name, primary_keys: list, fields: list):
if check_tableExists(table_name): # returns true if table exists in staging else false.
stmt = f"select * from {table_name}"
staging_df = spark.sql(stmt)
df = df.union(staging_df)
window = Window.partitionBy(primary_keys).orderBy('source_ts').rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
For i in fields: #these are the additive fields in the df
df = df.withColumn(i, F.sum(i).over(window))
df = df.withColumn('rank', F.row_number().over(window)).filter(F.col('rank')==1).drop(F.col('rank'))
return df
型
在实现结构化流媒体之后,我明白我不能再在我的df
上使用这些window functions
,我需要使用structured streaming
可用的window functions
,其中有一些时间参数,但我无法理解如何实现它们,因为我仍然作为batch
运行此作业(每天一次),然后它关闭。这些表没有实际的流作业,我想对来自staging
表的记录以及流df
进行sum
。
1条答案
按热度按时间vshtjzan1#
您可以使用
foreachBatch
来调用函数并执行转换。像这样修改你的函数:
字符串
把它叫做:
型
以下是使用的输入数据:
| Key1| Key2| NM1| NM2|添加剂字段1| AdditiveField2| DB_timestamp|源ts|
| --|--|--|--|--|--|--|--|
| 一|B|车|自行车| 10 | -0.02 | 20231201 | 20231127 |
| 一|B|出租车|周期| 4 | -0.06 | 20231201 | 20231128 |
| B| C| Xyz| ABC| 1 |-2.0| 2023431201 | 20231128 |
输出量:
x1c 0d1x的数据
| Key1| Key2| NM1| NM2|添加剂字段1| AdditiveField2| DB_timestamp|源ts|
| --|--|--|--|--|--|--|--|
| 一|B|出租车|周期| 14 | -0.08 | 20231201 | 20231128 |
| B| C| Xyz| ABC| 8 |-1个| 2023431201 | 20231128 |