pyspark-操纵Dataframe以获取状态的时间变化

qni6mghb  于 2021-07-14  发布在  Spark
关注(0)|答案(1)|浏览(263)

我在pyspark中有一个Dataframe描述服务的状态,如下所示。我接收服务状态更新的频率不是恒定的。
更新的状态时间戳:020-01-01 14:30:00ok2020-01-01 14:15:00broken2020-01-01 14:10:00broken2020-01-01 14:00:00broken2020-01-01 13:40:00broken2020-01-01 13:35:00ok2020-01-01 13:15:00ok2020-01-01 13:00:00ok2020-01-01 12:40:00
基于此,我想创建一个列,提供自上次更新以来满足以下条件的时间:
如果状态为“ok”,并且服务一直在运行而没有出现问题,则表示自上次更新以来的时间差。
当状态报告为“坏”时,我想要服务停止后的时间。
当状态再次恢复为ok时,则差值应为零。
所以,最终的数据集应该是这样的。
状态时间戳\u updatetime \u gone \u byok2020-01-01 14:30:0015minsok2020-01-01 14:15:000MINSBROKEN020-01-01 14:10:0055MINSBROKEN020-01-01 14:00:0045MINSBROKEN020-01-01 13:40:0025MINSROKEN020-01-01 13:35:0020minsok2020-01-01 13:15:0015minsok2020-01-01 13:00:0020minsok2020-01-01 12:40:00
有人知道如何在Pypark中做到这一点吗?谢谢!

xytpbqjk

xytpbqjk1#

您可以创建一些帮助器列来检查所需的条件:

from pyspark.sql import functions as F, Window

df2 = df.withColumn(
    'first_row',
    F.row_number().over(Window.orderBy('timestamp_of_update')) == 1
).withColumn(
    'change_to_ok', 
    (F.lag('status').over(Window.orderBy('timestamp_of_update')) != 'OK') & 
    (F.col('status') == 'OK') 
).withColumn(
    'last_ok', 
    F.last(
        F.when(F.col('status') == 'OK', F.col('timestamp_of_update')), 
        True
    ).over(Window.orderBy('timestamp_of_update'))
).withColumn(
    'time',
    F.when(
        F.col('status') == 'Broken',
        F.col('last_ok')
    ).when(
        F.col('change_to_ok'),
        F.col('timestamp_of_update')
    ).when(
        F.col('status') == 'OK',
        F.lag('timestamp_of_update').over(Window.orderBy('timestamp_of_update'))
    )
).withColumn(
    'time_gone_by', 
    (F.unix_timestamp('timestamp_of_update') - F.unix_timestamp('time'))/60
).select('status', 'timestamp_of_update', 'time_gone_by')

结果:

df2.show()
+------+-------------------+------------+
|status|timestamp_of_update|time_gone_by|
+------+-------------------+------------+
|    OK|2020-01-01 12:40:00|        null|
|    OK|2020-01-01 13:00:00|        20.0|
|    OK|2020-01-01 13:15:00|        15.0|
|Broken|2020-01-01 13:35:00|        20.0|
|Broken|2020-01-01 13:40:00|        25.0|
|Broken|2020-01-01 14:00:00|        45.0|
|Broken|2020-01-01 14:10:00|        55.0|
|    OK|2020-01-01 14:15:00|         0.0|
|    OK|2020-01-01 14:30:00|        15.0|
+------+-------------------+------------+

幕后:

+------+-------------------+---------+------------+-------------------+-------------------+------------+
|status|timestamp_of_update|first_row|change_to_ok|            last_ok|               time|time_gone_by|
+------+-------------------+---------+------------+-------------------+-------------------+------------+
|    OK|2020-01-01 12:40:00|     true|        null|2020-01-01 12:40:00|               null|        null|
|    OK|2020-01-01 13:00:00|    false|       false|2020-01-01 13:00:00|2020-01-01 12:40:00|        20.0|
|    OK|2020-01-01 13:15:00|    false|       false|2020-01-01 13:15:00|2020-01-01 13:00:00|        15.0|
|Broken|2020-01-01 13:35:00|    false|       false|2020-01-01 13:15:00|2020-01-01 13:15:00|        20.0|
|Broken|2020-01-01 13:40:00|    false|       false|2020-01-01 13:15:00|2020-01-01 13:15:00|        25.0|
|Broken|2020-01-01 14:00:00|    false|       false|2020-01-01 13:15:00|2020-01-01 13:15:00|        45.0|
|Broken|2020-01-01 14:10:00|    false|       false|2020-01-01 13:15:00|2020-01-01 13:15:00|        55.0|
|    OK|2020-01-01 14:15:00|    false|        true|2020-01-01 14:15:00|2020-01-01 14:15:00|         0.0|
|    OK|2020-01-01 14:30:00|    false|       false|2020-01-01 14:30:00|2020-01-01 14:15:00|        15.0|
+------+-------------------+---------+------------+-------------------+-------------------+------------+

相关问题