pyspark:在特定条件下以5分钟的间隔在Dataframe中划分时间间隔和分割行

zte4gxcn  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(465)

我有一个包含以下列的Dataframe:

+-----+----------+--------------------------+-----------+
|id   | sourceid |        timestamp         | indicator |
+-----+----------+--------------------------+-----------+
| 0   |  128     |  2019-12-03 12:00:00.0   | 0         |
| 1   |  128     |  2019-12-03 12:30:00.0   | 1         |
| 2   |  128     |  2019-12-03 12:37:00.0   | 0         |
| 3   |  128     |  2019-12-03 13:15:00.0   | 1         |
| 4   |  128     |  2019-12-03 13:17:00.0   | 0         | 
+-----+----------+--------------------------+-----------+

我正试着分开这两个房间 timestamp 列为5分钟时间间隔的行 indicator 不是0的值。
说明:
第一个条目是在时间 timestamp = 2019-12-03 12:00:00.0, indicator= 0 ,什么也不做。
继续下一个条目 timestamp = 2019-12-03 12:30:00.0, indicator= 1 ,我想分开 timestamp 排成一排,间隔5分钟,直到我们到达下一个条目 timestamp = 2019-12-03 12:37:00.0, indicator= 0 .
如果有这样的情况 timestamp = 2019-12-03 13:15:00.0, indicator = 1 下一个呢 timestamp = 2019-12-03 13:17:00.0, indicator = 0 ,我想将行拆分,因为这两个时间的指示器都是1,因为13:17:00.0在13:15:00.0到13:20:00.0之间,如下所示。
如何使用Pypark实现这一点?
预期产量:

+-----+----------+--------------------------+-------------+
|id   | sourceid |        timestamp         | indicator   |
+-----+----------+--------------------------+-------------+
| 1   | 128      |  2019-12-03 12:30:00.0   | 1           |
| 1   | 128      |  2019-12-03 12:35:00.0   | 1           |
| 4   | 128      |  2019-12-03 13:15:00.0   | 1           |
| 4   | 128      |  2019-12-03 13:20:00.0   | 1           |
+-----+----------+--------------------------+-------------+
3zwtqj6y

3zwtqj6y1#

iiuc,您可以根据当前行和下一行的指标筛选行,然后使用 array + explode 要创建新行(出于测试目的,我在原始示例中添加了更多行):

from pyspark.sql import Window, functions as F

w1 = Window.partitionBy('sourceid').orderBy('timestamp')

# add a flag to check if the next indicator is '0'

df1 = df.withColumn('next_indicator_is_0', F.lead('indicator').over(w1) == 0) 
df1.show(truncate=False)
+---+--------+---------------------+---------+-------------------+
|id |sourceid|timestamp            |indicator|next_indicator_is_0|
+---+--------+---------------------+---------+-------------------+
|0  |128     |2019-12-03 12:00:00.0|0        |false              |
|1  |128     |2019-12-03 12:30:00.0|1        |true               |
|2  |128     |2019-12-03 12:37:00.0|0        |false              |
|3  |128     |2019-12-03 13:12:00.0|1        |false              |
|4  |128     |2019-12-03 13:15:00.0|1        |true               |
|5  |128     |2019-12-03 13:17:00.0|0        |false              |
|6  |128     |2019-12-03 13:20:00.0|1        |null               |
+---+--------+---------------------+---------+-------------------+

df1.filter("indicator = 1 AND next_indicator_is_0") \
   .withColumn('timestamp', F.expr("explode(array(`timestamp`, `timestamp` + interval 5 minutes))")) \
   .drop('next_indicator_is_0') \
   .show(truncate=False)
+---+--------+---------------------+---------+
|id |sourceid|timestamp            |indicator|
+---+--------+---------------------+---------+
|1  |128     |2019-12-03 12:30:00.0|1        |
|1  |128     |2019-12-03 12:35:00  |1        |
|4  |128     |2019-12-03 13:15:00.0|1        |
|4  |128     |2019-12-03 13:20:00  |1        |
+---+--------+---------------------+---------+

注意:您可以重置 id 使用列 F.row_number().over(w1) 或者 F.monotonically_increasing_id() 根据您的要求。

相关问题