我在csv中有以下数据:
+----+----------+-----+
|name| timestamp|value|
+----+----------+-----+
| A|1604219844| 7|
| A|1604219845| 1|
| A|1604219846| 1|
| A|1604219847| 1|
| A|1604219848| 2|
| A|1604219849| 7|
| A|1604219850| 1|
| A|1604219851| 1|
| A|1604219852| 2|
| A|1604219853| 7|
| A|1604219854| 1|
| A|1604219855| 1|
+----+----------+-----+
为了跟踪顺序值的变化,到目前为止,我已经实现了以下代码:
import findspark
findspark.init()
import pyspark # only run after findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
import pyspark.sql.functions as psf
spark = SparkSession.builder.getOrCreate()
data = spark.read.option("header","true").csv("sample_data.csv")
data.show()
w = Window.orderBy("timestamp")
value_lag = lag('value').over(w)
timestamp_lag = lag('timestamp').over(w)
df_final = data.withColumn('prev_timestamp', timestamp_lag).withColumn('prev_value', value_lag)\
.withColumn("changed", (data.value != psf.lag('value').over(w)).cast('int'))\
.withColumn("diff",data.value - psf.lag('value').over(w))
df_final.show()
上述代码的输出如下:
+----+----------+-----+--------------+----------+-------+----+
|name| timestamp|value|prev_timestamp|prev_value|changed|diff|
+----+----------+-----+--------------+----------+-------+----+
| A|1604219844| 7| null| null| null|null|
| A|1604219845| 1| 1604219844| 7| 1|-6.0|
| A|1604219846| 1| 1604219845| 1| 0| 0.0|
| A|1604219847| 1| 1604219846| 1| 0| 0.0|
| A|1604219848| 2| 1604219847| 1| 1| 1.0|
| A|1604219849| 7| 1604219848| 2| 1| 5.0|
| A|1604219850| 1| 1604219849| 7| 1|-6.0|
| A|1604219851| 1| 1604219850| 1| 0| 0.0|
| A|1604219852| 2| 1604219851| 1| 1| 1.0|
| A|1604219853| 7| 1604219852| 2| 1| 5.0|
| A|1604219854| 1| 1604219853| 7| 1|-6.0|
| A|1604219855| 1| 1604219854| 1| 0| 0.0|
+----+----------+-----+--------------+----------+-------+----+
我想在上面的dataframe中实现一个额外的列来集群更改 1 -> 2 -> 7 -> 1
连续发生的
+----+----------+-----+--------------+----------+-------+----+---------------+
|name| timestamp|value|prev_timestamp|prev_value|changed|diff| keyword|
+----+----------+-----+--------------+----------+-------+----+---------------+
| A|1604219844| 7| null| null| null|null| null|
| A|1604219845| 1| 1604219844| 7| 1| -6|Insert1-Update1|
| A|1604219846| 1| 1604219845| 1| 0| 0| null|
| A|1604219847| 1| 1604219846| 1| 0| 0| null|
| A|1604219848| 2| 1604219847| 1| 1| 1| Insert2|
| A|1604219849| 7| 1604219848| 2| 1| 5|Insert2-Update1|
| A|1604219850| 1| 1604219849| 7| 1| -6|Insert2-Update2|
| A|1604219851| 1| 1604219850| 1| 0| 0| null|
| A|1604219852| 2| 1604219851| 1| 1| 1| Insert3|
| A|1604219853| 7| 1604219852| 2| 1| 5|Insert3-Update1|
| A|1604219854| 1| 1604219853| 7| 1| -6|Insert3-Update2|
| A|1604219855| 1| 1604219854| 1| 0| 0| null|
+----+----------+-----+--------------+----------+-------+----+---------------+
这个专栏的想法是,它可以帮助根据关键字对专栏进行分组 Insert2,Insert3
然后在每一个里面 Insert2-Update1,Insert2-Update2
从而得到最终需要得到的聚类结果
+----+----------+-----+--------------+----------+------------------------+------------------------+------------------------+
|name| timestamp|value|prev_timestamp|prev_value|changed_timestamp_1_to_2|changed_timestamp_2_to_7|changed_timestamp_7_to_1|
+----+----------+-----+--------------+----------+------------------------+------------------------+------------------------+
| A|1604219848| 2| 1604219847| 1| 1604219848| 1604219849| 1604219850|
| A|1604219852| 2| 1604219851| 1| 1604219852| 1604219853| 1604219854|
+----+----------+-----+--------------+----------+------------------------+------------------------+------------------------+
谢谢你的帮助
暂无答案!
目前还没有任何答案,快来回答吧!