我试图通过以下方式找到每个列属性值的变化:
windowSpec = Window.partitionBy("attribute").orderBy(df_series['time'].asc())
final_df_series = df_series.withColumn('lagdate',f.lag(df_series['time'],-1).over(windowSpec))\
.withColumn("value_lagvalue$df",(f.lag(df_series["value"],-1).over(windowSpec)))\
.withColumn("value_grp$df",(f.col("value") - f.col("value_lagvalue$df")).cast("int"))\
.filter(F.col("value_grp$df") != 0).drop(F.col("value_grp$df"))\
.select("attribute","lagdate","value_lagvalue$df").persist()
以上代码的Dataframe输出为:
+---------+-------------------+-----------------+
|attribute| lagdate|value_lagvalue$df|
+---------+-------------------+-----------------+
| column93|2020-09-07 10:29:24| 3|
| column93|2020-09-07 10:29:38| 1|
| column93|2020-09-07 10:31:08| 0|
| column94|2020-09-07 10:29:26| 3|
| column94|2020-09-07 10:29:40| 1|
| column94|2020-09-07 10:31:18| 0|
|column281|2020-09-07 10:29:34| 3|
|column281|2020-09-07 10:29:54| 0|
|column281|2020-09-07 10:31:08| 3|
|column281|2020-09-07 10:31:13| 0|
|column281|2020-09-07 10:35:24| 3|
|column281|2020-09-07 10:36:08| 0|
|column282|2020-09-07 10:41:13| 3|
|column282|2020-09-07 10:49:24| 1|
|column284|2020-09-07 10:51:08| 1|
|column284|2020-09-07 11:01:13| 0|
|column285|2020-09-07 11:21:13| 1|
+---------+-------------------+-----------------+
我想把它转换成下面的结构
attribute,timestamp_3,timestamp_1,timestamp_0
column93,2020-09-07 10:29:24,2020-09-07 10:29:38,2020-09-07 10:31:08
column94,2020-09-07 10:29:26,2020-09-07 10:29:40,2020-09-07 10:31:18
column281,2020-09-07 10:29:34,null,2020-09-07 10:29:54
column281,2020-09-07 10:31:08,null,2020-09-07 10:31:13
column281,2020-09-07 10:35:24,null,2020-09-07 10:36:08
column282,2020-09-07 10:41:13,2020-09-07 10:49:24,null
column284,null,2020-09-07 10:51:08,2020-09-07 11:01:13
column285,null,2020-09-07 11:21:13,null
感谢您的帮助 pyspark
这是最好的,因为它是优化性质的大Dataframe这样的类型,但在Pandas也很有帮助)。
更新:
这篇文章似乎达到了几乎相同的效果。希望来自社会各界的帮助,达到预期的目标
pyspark根据名称将列表分解为多列
暂无答案!
目前还没有任何答案,快来回答吧!