如何根据pyspark数据drame中行之间的差异计算并创建新列

9ceoxa92  于 2021-07-14  发布在  Spark
关注(0)|答案(1)|浏览(322)

我的df如下所示:

objid|gpstime            | gpsspeed|
+------+-------------------+-----
|X   |2018-04-03 11:00:40|       10|
|X   |2018-04-03 11:00:47|       15|
|X   |2018-04-03 11:00:50|       10|
|Y   |2018-04-03 11:00:52|       30|
|Y   |2018-04-03 11:00:59|       50|

结果如下:

objid|gpstime|          gpsspeed|timeDiff |speedDiff|
+------+-------------------+--------+---------+---------+
|X|2018-04-03 11:00:40|       10|        -|         |
|X|2018-04-03 11:00:47|       15|        7|        5|
|X|2018-04-03 11:00:50|       10|        3|       -5|
|Y|2018-04-03 11:00:52|       30|        2|       20|
|Y|2018-04-03 11:00:59|       50|        7|       20|

所以我需要创建两个新的列的基础上从现有的区别,但我有一个问题。我的一列代码如下所示:

from pyspark.sql.functions import
from pyspark.sql.window import Window

df.withColumn("time_intertweet", datediff(df.gpstime, lag(df.gpstime, 1)
.over(Window.partitionBy("gpstime")
.orderBy("gpstime"))))

你知道怎么修吗?

kcwpcxri

kcwpcxri1#

您不应该按任何内容进行分区,因为您希望在不进行任何分区的情况下获取前一行。另外,如果您想得到以秒为单位的时间差,您可能需要使用 unix_timestamp 而不是 datediff (返回天数差异)。

from pyspark.sql.functions import unix_timestamp, lag
from pyspark.sql import Window

df2 = df.withColumn(
    "timeDiff", 
    unix_timestamp(df.gpstime) - unix_timestamp(lag(df.gpstime, 1).over(Window.orderBy("gpstime")))
).withColumn(
    "speedDiff",
    df.gpsspeed - lag(df.gpsspeed, 1).over(Window.orderBy("gpstime"))
)

相关问题