pyspark遍历窗口计算累计最大值

83qze16e  于 2021-05-27  发布在  Spark
关注(0)|答案(2)|浏览(389)

我有一个带有车辆ID,时间戳和里程表的数据框。一些里程表读数可能为空。我想创建一个新的列,它是每个vehicleid的时间戳的当前里程表,如果为null,则使用以前的none null里程表。
例子

+------------+------------------------+-----------+-------------------------+
|vehicleID   |startDateTimeUtc        |Odometer   |NewColumn-CurrentOdometer|
+------------+------------------------+-----------+-------------------------+
|a           |2019-04-11T16:27:32+0000|10000      |10000                    |
|a           |2019-04-11T16:27:32+0000|15000      |15000                    |
|a           |2019-04-11T16:43:10+0000|null       |15000                    |
|a           |2019-04-11T20:13:52+0000|null       |15000                    |
|a           |2019-04-12T14:50:35+0000|null       |15000                    |
|a           |2019-04-12T18:53:19+0000|20000      |20000                    |
|b           |2019-04-12T19:06:41+0000|350000     |350000                   |
|b           |2019-04-12T19:17:15+0000|370000     |370000                   |
|b           |2019-04-12T19:30:32+0000|null       |370000                   |
|b           |2019-04-12T20:19:41+0000|380000     |380000                   |
|b           |2019-04-12T20:42:26+0000|null       |380000                   |

我知道我需要使用窗口功能。我可能也需要使用“lag”,但我怎么能不只是查找以前的记录呢?(见示例vehicleid a)非常感谢!

my_window = Window.partitionBy("vehicleID").orderBy("vehicleID","startDateTimeUtc")
jv2fixgn

jv2fixgn1#

另一种选择-使用 max 带窗框 unboundedpreceding and currentrow ###加载提供的测试数据

val data =
      """
        |vehicleID   |startDateTimeUtc        |Odometer
        |a           |2019-04-11T16:27:32+0000|10000
        |a           |2019-04-11T16:27:32+0000|15000
        |a           |2019-04-11T16:43:10+0000|null
        |a           |2019-04-11T20:13:52+0000|null
        |a           |2019-04-12T14:50:35+0000|null
        |a           |2019-04-12T18:53:19+0000|20000
        |b           |2019-04-12T19:06:41+0000|350000
        |b           |2019-04-12T19:17:15+0000|370000
        |b           |2019-04-12T19:30:32+0000|null
        |b           |2019-04-12T20:19:41+0000|380000
        |b           |2019-04-12T20:42:26+0000|null
      """.stripMargin
    val stringDS1 = data.split(System.lineSeparator())
      .map(_.split("\\|").map(_.replaceAll("""^[ \t]+|[ \t]+$""", "")).mkString(","))
      .toSeq.toDS()
    val df1 = spark.read
      .option("sep", ",")
      .option("inferSchema", "true")
      .option("header", "true")
      .option("nullValue", "null")
      .csv(stringDS1)
    df1.show(false)
    df1.printSchema()
    /**
      * +---------+------------------------+--------+
      * |vehicleID|startDateTimeUtc        |Odometer|
      * +---------+------------------------+--------+
      * |a        |2019-04-11T16:27:32+0000|10000   |
      * |a        |2019-04-11T16:27:32+0000|15000   |
      * |a        |2019-04-11T16:43:10+0000|null    |
      * |a        |2019-04-11T20:13:52+0000|null    |
      * |a        |2019-04-12T14:50:35+0000|null    |
      * |a        |2019-04-12T18:53:19+0000|20000   |
      * |b        |2019-04-12T19:06:41+0000|350000  |
      * |b        |2019-04-12T19:17:15+0000|370000  |
      * |b        |2019-04-12T19:30:32+0000|null    |
      * |b        |2019-04-12T20:19:41+0000|380000  |
      * |b        |2019-04-12T20:42:26+0000|null    |
      * +---------+------------------------+--------+
      *
      * root
      * |-- vehicleID: string (nullable = true)
      * |-- startDateTimeUtc: string (nullable = true)
      * |-- Odometer: integer (nullable = true)
      */

计算有序窗口中的最大值

val w = Window.partitionBy("vehicleID").orderBy("startDateTimeUtc")
        .rowsBetween(Window.unboundedPreceding, Window.currentRow)
    df1.withColumn("NewColumn-CurrentOdometer",
      max("Odometer").over(w))
      .show(false)

    /**
      * +---------+------------------------+--------+-------------------------+
      * |vehicleID|startDateTimeUtc        |Odometer|NewColumn-CurrentOdometer|
      * +---------+------------------------+--------+-------------------------+
      * |a        |2019-04-11T16:27:32+0000|10000   |10000                    |
      * |a        |2019-04-11T16:27:32+0000|15000   |15000                    |
      * |a        |2019-04-11T16:43:10+0000|null    |15000                    |
      * |a        |2019-04-11T20:13:52+0000|null    |15000                    |
      * |a        |2019-04-12T14:50:35+0000|null    |15000                    |
      * |a        |2019-04-12T18:53:19+0000|20000   |20000                    |
      * |b        |2019-04-12T19:06:41+0000|350000  |350000                   |
      * |b        |2019-04-12T19:17:15+0000|370000  |370000                   |
      * |b        |2019-04-12T19:30:32+0000|null    |370000                   |
      * |b        |2019-04-12T20:19:41+0000|380000  |380000                   |
      * |b        |2019-04-12T20:42:26+0000|null    |380000                   |
      * +---------+------------------------+--------+-------------------------+
      */
8ftvxx2r

8ftvxx2r2#

使用 last window function ignorenulls标志为 True ,中间有行 unboundedPreceeding and currentRow .

df.show(20,False)

# +---------+------------------------+--------+

# |vehicleid|startdatetimeutc        |odometer|

# +---------+------------------------+--------+

# |a        |2019-04-11T16:27:32+0000|10000   |

# |a        |2019-04-11T16:27:32+0000|15000   |

# |a        |2019-04-11T16:43:10+0000|null    |

# |a        |2019-04-11T20:13:52+0000|null    |

# |a        |2019-04-12T14:50:35+0000|null    |

# |a        |2019-04-12T18:53:19+0000|20000   |

# |b        |2019-04-12T19:06:41+0000|350000  |

# |b        |2019-04-12T19:17:15+0000|370000  |

# |b        |2019-04-12T19:30:32+0000|null    |

# |b        |2019-04-12T20:19:41+0000|380000  |

# |b        |2019-04-12T20:42:26+0000|null    |

# +---------+------------------------+--------+

import sys
my_window = Window.partitionBy("vehicleID").orderBy("vehicleID","startDateTimeUtc").rowsBetween(-sys.maxsize,0)

df.withColumn("NewColumn-CurrentOdometer",last(col("Odometer"),True).over(my_window)).orderBy("vehicleid").show(20,False)

# +---------+------------------------+--------+-------------------------+

# |vehicleid|startdatetimeutc        |odometer|NewColumn-CurrentOdometer|

# +---------+------------------------+--------+-------------------------+

# |a        |2019-04-11T16:27:32+0000|10000   |10000                    |

# |a        |2019-04-11T16:27:32+0000|15000   |15000                    |

# |a        |2019-04-11T16:43:10+0000|null    |15000                    |

# |a        |2019-04-11T20:13:52+0000|null    |15000                    |

# |a        |2019-04-12T14:50:35+0000|null    |15000                    |

# |a        |2019-04-12T18:53:19+0000|20000   |20000                    |

# |b        |2019-04-12T19:06:41+0000|350000  |350000                   |

# |b        |2019-04-12T19:17:15+0000|370000  |370000                   |

# |b        |2019-04-12T19:30:32+0000|null    |370000                   |

# |b        |2019-04-12T20:19:41+0000|380000  |380000                   |

# |b        |2019-04-12T20:42:26+0000|null    |380000                   |

# +---------+------------------------+--------+-------------------------+

相关问题