pyspark 查找分区上30到365天之间的事件

9cbw7uwe  于 5个月前  发布在  Spark
关注(0)|答案(2)|浏览(37)

我有DF,其中包括患者ID和患者接受医疗程序的日期。我需要过滤DF,以仅包括至少接受过两次手术的患者,两次手术间隔30到365天。我只需要保留患者ID和符合时间范围标准的第一次手术。
原始DF:
| 患者ID|日期|
| --|--|
| 一|18年3月1日|
| 一|18年3月15日|
| B| 19年4月1日|
| B| 19年4月4日|
| B| 19年4月7日|
| B| 19年6月3日|
和滤波后的DF:
| 患者ID|日期|
| --|--|
| B| 19年4月7日|
这是我试过的代码.

w=Window.partitionBy("Pat_ID").orderBy(col("date"))
for i in range(1, 366): 
    df = df.withColumn(f"daysbetween_{i}", when ((datediff((F.lead(F.col('dx_date'), i).over(w)), "dx_date").between(30, 365)),1).otherwise(0))

字符串

oxiaedzo

oxiaedzo1#

  • 编辑:使用范围连接条件添加 *

我这样做的总体方法是:
1.通过患者ID和range join conditions将表连接到自身
1.计算每一行的成对日期差
1.按患者ID分组,最早日期的汇总

然而,这种方法的主要问题是你可能会耗尽内存,这取决于你的数据集;因为你可能会为每个病人的N程序生成N^2行。我已经添加了过滤器来只生成有效的日期对(在30-365天范围内),但是如果你的数据集真的很大,它 * 仍然有可能耗尽内存 *。
以下是我的方法:

首先,模拟数据集

from datetime import datetime
import pyspark.sql.functions as F
from pyspark.sql.window import Window as W

df = spark.createDataFrame([
    ("A", datetime(2018,3,1))
    ,("A", datetime(2018,3,15))
    ,("B", datetime(2019,4,1))
    ,("B", datetime(2019,4,4))
    ,("B", datetime(2019,4,7))
    ,("B", datetime(2019,6,3))
    ,("C", datetime(2018,1,1))
    ,("D", datetime(2018,1,1))
    ,("D", datetime(2018,3,1))
    ,("D", datetime(2020,1,1))
  ]
  ,schema="patient_id string, procedure_date date"
)

df.show()

个字符
接下来,我创建DataFrame filter_df,根据最大范围筛选潜在患者。这将筛选出:

  • 少于2次手术的患者
  • 两次最远手术间隔不到30天的患者

我不能使用between(30,365)max_range <= 365,因为即使最大-最小范围超过365天的限制,也可能存在最小-最大对之间的差异小于365天的日期对。
但是,反之则不然。如果最大-最小范围小于30天,则不能有另一个范围大于30天的日期对。

filter_df = df.groupBy("patient_id")\
  .agg(
     F.min("procedure_date").alias("first_date")
    ,F.max("procedure_date").alias("latest_date")
  )\
  .withColumn("max_range", F.date_diff("latest_date", "first_date"))\
  .filter(F.col("max_range") >= 30)

filter_df.show()
+----------+----------+-----------+---------+
|patient_id|first_date|latest_date|max_range|
+----------+----------+-----------+---------+
|         B|2019-04-01| 2019-06-03|       63|
|         D|2018-01-01| 2020-01-01|      730|
+----------+----------+-----------+---------+

的字符串
使用inner join应用过滤器:

df = df.join(
  filter_df.select("patient_id")
  ,on="patient_id"
  ,how="inner"
)

df.show()
+----------+--------------+
|patient_id|procedure_date|
+----------+--------------+
|         B|    2019-04-01|
|         B|    2019-04-04|
|         B|    2019-04-07|
|         B|    2019-06-03|
|         D|    2018-01-01|
|         D|    2018-03-01|
|         D|    2020-01-01|
+----------+--------------+

的字符串
然后,我通过患者id和range join conditions(基本上,将date_diff().between()条件添加到join(on=...)参数) 将这个过滤后的结果与其本身连接起来。为此,我必须重命名患者id和日期列以区分它们。
这为我提供了所有患者的所有有效日期对。
使用范围连接条件,我避免了生成4^2 + 3^2 = 25行的二次行为,而是只得到日期范围(30,365)天内的4个有效日期对。
无论你使用F.date_diff("date_b", "date_a")还是F.date_diff("date_a", "date_b")都没有关系--成对差分是对称的,你会得到相同的结果。负结果也没有关系,原因也是一样的。

df = df\
  .withColumnRenamed("patient_id", "id_a")\
  .withColumnRenamed("procedure_date", "date_a")\
  .join(
    df\
      .withColumnRenamed("patient_id", "id_b")\
      .withColumnRenamed("procedure_date", "date_b")
    ,on=[
      F.col("id_a") == F.col("id_b")
      ,F.date_diff("date_b", "date_a").between(30, 365)
    ]
    ,how="inner"
  )\
  .withColumn("date_diff", F.date_diff("date_b", "date_a"))

df.show()
+----+----------+----+----------+---------+
|id_a|    date_a|id_b|    date_b|date_diff|
+----+----------+----+----------+---------+
|   B|2019-04-01|   B|2019-06-03|       63|
|   B|2019-04-04|   B|2019-06-03|       60|
|   B|2019-04-07|   B|2019-06-03|       57|
|   D|2018-01-01|   D|2018-03-01|       59|
+----+----------+----+----------+---------+

然后我按患者ID分组,并选择min("date_a")以获得符合此条件的 * 第一个程序 *。
但是我注意到我的代码发现患者B的1 April 2019是答案,而不是您的示例7 April 2019结果。
根据你对“第一次”的理解,你需要修改最后一个代码块来得到你的答案。

df = df\
  .groupBy("id_a")\
  .agg(F.min("date_a").alias("procedure_date"))\
  .select(
    F.col("id_a").alias("patient_id")
    ,"procedure_date"
  )

df.show()
+----------+--------------+
|patient_id|procedure_date|
+----------+--------------+
|         B|    2019-04-01|
|         D|    2018-01-01|
+----------+--------------+
iq0todco

iq0todco2#

一种使用内置Spark函数而不使用joins的方法(出于性能原因):
1.将数据按patient_idcollect all dates into a list分组。
1.对于每个日期列表,使用this answer生成所有可能的日期对。

  1. Filter out不符合标准的日期对长于30天且短于365天。
  2. Sort剩余的对,使得具有最小开始日期的对是列表的第一个元素。
    1.第一对的第一个日期是预产期。
from pyspark.sql import functions as F

df = ...

df.groupBy("patient_id").agg(F.collect_list("procedure_date")
                             .alias("procedure_dates"))\
  .withColumn("procedure_dates",
    F.filter(
      F.transform(
        F.flatten(F.transform(
                c:="procedure_dates",
                lambda x: F.arrays_zip(F.array_repeat(x, F.size(c)), c)
        )),
        lambda x: F.array(x["0"], x[c])
      ),
      lambda x: x[0] < x[1]
    ))\
  .withColumn("procedure_dates", F.filter("procedure_dates", 
       lambda col: F.datediff(col[1], col[0]).between(30, 365))) \ 
 .withColumn("procedure_dates", F.sort_array("procedure_dates")) \
  .withColumn("result", F.col("procedure_dates")[0][0]) \
  .drop("procedure_dates") \
  .show()

字符串
测试结果:

+----------+----------+
|patient_id|    result|
+----------+----------+
|         A|      null|
|         B|2019-04-01|
+----------+----------+

相关问题