sparksql超前/滞后函数中的动态/可变偏移量

mwkjh3gx  于 2021-07-12  发布在  Spark
关注(0)|答案(1)|浏览(258)

我们是否可以使用一个偏移量值,它取决于sparksql中lead/lag函数的列值?
示例:下面是一些很好的方法。

val sampleData = Seq( ("bob","Developer",125000),
  ("mark","Developer",108000),
  ("carl","Tester",70000),
  ("peter","Developer",185000),
  ("jon","Tester",65000),
  ("roman","Tester",82000),
  ("simon","Developer",98000),
  ("eric","Developer",144000),
  ("carlos","Tester",75000),
  ("henry","Developer",110000)).toDF("Name","Role","Salary")

val window = Window.orderBy("Role")

//Derive lag column for salary
val laggingCol = lag(col("Salary"), 1).over(window)

//Use derived column LastSalary to find difference between current and previous row
val salaryDifference = col("Salary") - col("LastSalary")

//Calculate trend based on the difference
//IF ELSE / CASE can be written using when.otherwise in spark
val trend = when(col("SalaryDiff").isNull || col("SalaryDiff").===(0), "SAME")
  .when(col("SalaryDiff").>(0), "UP")
  .otherwise("DOWN")

sampleData.withColumn("LastSalary", laggingCol)
  .withColumn("SalaryDiff",salaryDifference)
  .withColumn("Trend", trend).show()

现在,我的用例是这样的,我们必须传递的偏移量取决于integer类型的特定列。这是我想做的:

val sampleData = Seq( ("bob","Developer",125000,2),
  ("mark","Developer",108000,3),
  ("carl","Tester",70000,3),
  ("peter","Developer",185000,2),
  ("jon","Tester",65000,1),
  ("roman","Tester",82000,1),
  ("simon","Developer",98000,2),
  ("eric","Developer",144000,3),
  ("carlos","Tester",75000,2),
  ("henry","Developer",110000,2)).toDF("Name","Role","Salary","ColumnForOffset")

val window = Window.orderBy("Role")

//Derive lag column for salary
val laggingCol = lag(col("Salary"), col("ColumnForOffset")).over(window)

//Use derived column LastSalary to find difference between current and previous row
val salaryDifference = col("Salary") - col("LastSalary")

//Calculate trend based on the difference
//IF ELSE / CASE can be written using when.otherwise in spark
val trend = when(col("SalaryDiff").isNull || col("SalaryDiff").===(0), "SAME")
  .when(col("SalaryDiff").>(0), "UP")
  .otherwise("DOWN")

sampleData.withColumn("LastSalary", laggingCol)
  .withColumn("SalaryDiff",salaryDifference)
  .withColumn("Trend", trend).show()

由于偏移量只取整数值,因此这将引发预期的异常。让我们来讨论一下,我们是否可以实现一个逻辑。

ngynwnxp

ngynwnxp1#

您可以添加行号列,并根据行号和偏移量进行自联接,例如:

val df = sampleData.withColumn("rn", row_number().over(window))

val df2 = df.alias("t1").join(
    df.alias("t2"),
    expr("t1.rn = t2.rn + t1.ColumnForOffset"),
    "left"
).selectExpr("t1.*", "t2.Salary as LastSalary")

df2.show
+------+---------+------+---------------+---+----------+
|  Name|     Role|Salary|ColumnForOffset| rn|LastSalary|
+------+---------+------+---------------+---+----------+
|   bob|Developer|125000|              2|  1|      null|
|  mark|Developer|108000|              3|  2|      null|
| peter|Developer|185000|              2|  3|    125000|
| simon|Developer| 98000|              2|  4|    108000|
|  eric|Developer|144000|              3|  5|    108000|
| henry|Developer|110000|              2|  6|     98000|
|  carl|   Tester| 70000|              3|  7|     98000|
|   jon|   Tester| 65000|              1|  8|     70000|
| roman|   Tester| 82000|              1|  9|     65000|
|carlos|   Tester| 75000|              2| 10|     65000|
+------+---------+------+---------------+---+----------+

相关问题