Pyspark窗口,统计显示和不显示的约会与一天的滞后

gpnt7bae  于 6个月前  发布在  Spark
关注(0)|答案(2)|浏览(58)

我想添加两列到pyspark对象框中,计算psdln.awz列中True或False计数的总和,并为每个唯一的psyin_iden_rn数字执行此操作。然而,条件是它只计算它正在计算的行之前的日期的True或False的总和。因此,需要延迟一天。还有一件事它需要做的是计算每个约会,而不仅仅是天数。

示例数据

数据= [(1,'2023-01- 01',True),(1,'2023-01- 01',True),(1,'2023-01- 02',True),(1,“2023-01- 02',False),(1,”2023-01- 03',False),(1,“2023-01- 04',True),(2,”2023 -01- 02',True),(2,"2023-01- 02',False),(2,“2023-01- 03',False),]
columns ='psyin_iden_rn',' psdln.dat ',' psdln.awz ']
输出数据= [(1,'2023-01- 01',True,0,0),(1,'2023-01- 01',True,0,0),(1,'2023-01- 02',True,2,0),(1,'2023-01- 02',False,2,0),(1,'2023-01- 03',False,3,1),(1,'2023-01- 04',True,3,2),(1,'2023-01- 02',True,0,0),(1,'2023-01- 02',False,0,0),(1,'2023-01- 03',False,1,1),]
列=“psyin_iden_rn”,“psdln.dat”,“psdln.awz”,“show_cnt”,“no_show_cnt”]
我已经尝试了许多不同的东西,但它们似乎都导致我的一些条件不被满足。要么它开始计数的基础上,它正在计算的行不仅为前几天。或者它有问题的日期出现两次。例如,它只计算这些日期一次,或者像下面的示例代码一样,它不计算之前几天的总和,而是计算之前任何日期的总和。(也是同一天)。

创建窗口规范

window_spec = Window.partitionBy('psyin_iden_rn').orderBy(' psdln.dat ').rowsBetween(Window.unboundedPreceding,-1)

定义show_cnt和no_show_cnt的条件

show_condition =(F.col(' psdln.awz ')== True)no_show_condition =(F.col(' psdln.awz ')== False)

使用窗口函数计算show_cnt和no_show_cnt

DF_contacten =(DF_contacten .withColumn('show_cnt',F.sum(F.when(show_condition,1).otherwise(0)).over(window_spec)).withColumn('no_show_cnt',F.sum(F.when(no_show_condition,1).otherwise(0)).over(window_spec)
我如何调整或使用这个窗口,以便我得到一个输出,如在上面的示例中,

vddsk6oq

vddsk6oq1#

我认为你分享的输出数据内容有一些差异。这是我为你提供的解决方案。本质上,我使用的是显示和不显示条件的反向合计。一旦你有了所需的列,请删除额外的列。

from pyspark.sql import Window
data = [ (1, '2023-01-01', True), (1, '2023-01-01', True), (1, '2023-01-02', True), (1, '2023-01-02', False), (1, '2023-01-03', False), (1, '2023-01-04', True), (2, '2023-01-02', True), (2, '2023-01-02', False), (2, '2023-01-03', False), ]
columns = ['psyin_iden_rn', 'psdln_dat', 'psdln_awz']
input=spark.createDataFrame(data,schema=columns)
true_cond = when(col('psdln_awz') == "true", 1).otherwise(0)
false_cond = when(col('psdln_awz') == "false", 1).otherwise(0)
input=input.withColumn("pt_ws_total",sum(true_cond).over(Window.partitionBy("psyin_iden_rn").orderBy(lit("A"))))
input=input.withColumn("pt_ws_n_total",sum(false_cond).over(Window.partitionBy("psyin_iden_rn").orderBy(lit("A"))))
window_spec=Window.partitionBy("psyin_iden_rn").orderBy(col('psyin_iden_rn').desc(),col('psdln_dat').desc())
input=input.withColumn("rvrs_ws_total",sum(true_cond).over(window_spec))
input=input.withColumn("rvrs_n_ws_total",sum(false_cond).over(window_spec))
input=input.sort('psyin_iden_rn','psdln_dat')
input=input.withColumn("show_cnt",col("pt_ws_total")-col("rvrs_ws_total"))
input=input.withColumn("no_show_cnt",col("pt_ws_n_total")-col("rvrs_n_ws_total"))
input.show()

字符串

icnyk63a

icnyk63a2#

我认为原始方法的问题在于,窗口函数计算当前行日期和以前日期的psdln.awz列中True和False的出现次数。您需要它只计算当前行日期之前的日期,尊重唯一的psyin_iden_rn值,并单独考虑每个约会,即使它们发生在同一天
检查下面的代码。我已经修改了PySpark代码中的窗口规范,将数据按psyin_iden_rn分区,并按psdln.dat排序。关键的变化是将窗口的范围设置为考虑当前行(rowsBetween(Window.unboundedPreceding, -1))之前的所有行,但不包括当前行(rowsBetween(Window.unboundedPreceding, -1))

from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import DateType

data = [(1, '2023-01-01', True), (1, '2023-01-01', True), (1, '2023-01-02', True),
        (1, '2023-01-02', False), (1, '2023-01-03', False), (1, '2023-01-04', True),
        (2, '2023-01-02', True), (2, '2023-01-02', False), (2, '2023-01-03', False)]

columns = ['psyin_iden_rn', 'psdln.dat', 'psdln.awz']
df = spark.createDataFrame(data, columns)

df = df.withColumn('psdln.dat', F.col('psdln.dat').cast(DateType()))

daily_counts = df.groupBy('psyin_iden_rn', 'psdln.dat').agg(
    F.sum(F.when(F.col('psdln.awz') == True, 1).otherwise(0)).alias('daily_show_cnt'),
    F.sum(F.when(F.col('psdln.awz') == False, 1).otherwise(0)).alias('daily_no_show_cnt')
)

window_spec = Window.partitionBy('psyin_iden_rn').orderBy('psdln.dat').rowsBetween(Window.unboundedPreceding, -1)

# Calculate cumulative counts
daily_counts = daily_counts.withColumn('show_cnt', F.sum('daily_show_cnt').over(window_spec))
daily_counts = daily_counts.withColumn('no_show_cnt', F.sum('daily_no_show_cnt').over(window_spec))

# Join back to the original dataframe to get the final result
result_df = df.join(daily_counts, ['psyin_iden_rn', 'psdln.dat'])

result_df.show()

字符串

相关问题