比较当前行和上一行的值,如果spark中需要,比较后一行的值

nnvyjq4y  于 2021-05-29  发布在  Spark
关注(0)|答案(2)|浏览(439)

我正在尝试根据值选择一列的值​​其他行和其他列的。

scala> val df = Seq((1,"051",0,0,10,0),(1,"052",0,0,0,0),(2,"053",10,0,10,0),(2,"054",0,0,10,0),(3,"055",100,50,0,0),(3,"056",100,10,0,0),(3,"057",100,20,0,0),(4,"058",70,15,0,0),(4,"059",70,15,0,20),(4,"060",70,15,0,0)).toDF("id","code","value_1","value_2","value_3","Value_4")
scala> df.show()
+---+----+-------+-------+-------+-------+
| id|code|value_1|value_2|value_3|Value_4|
+---+----+-------+-------+-------+-------+
|  1| 051|      0|      0|     10|      0|
|  1| 052|      0|      0|      0|      0|
|  2| 053|     10|      0|     10|      0|
|  2| 054|      0|      0|     10|      0|
|  3| 055|    100|     50|      0|      0|
|  3| 056|    100|     10|      0|      0| 
|  3| 057|    100|     20|      0|      0| 
|  4| 058|     70|     15|      0|      0| 
|  4| 059|     70|     15|      0|     20| 
|  4| 060|     70|     15|      0|      0| 
+---+----+-------+-------+-------+-------+

计算逻辑:
按照以下步骤为id选择代码
对于每列n(值\u 1、值\u 2、值\u 3、值\u 4),执行
对于相同的id,请在“值”列中查找最大值
如果重复最大值,则计算下一列
否则,如果没有重复地找到最大值,则取具有最大值的列的id和代码。不再需要计算以下列。
预期产量:

+---+----+-------+-------+-------+-------+
| id|code|value_1|value_2|value_3|Value_4|
+---+----+-------+-------+-------+-------+
|  1| 051|      0|      0|     10|      0|
|  2| 053|     10|      0|     10|      0|
|  3| 055|    100|     50|      0|      0|
|  4| 059|     70|     15|      0|     20|
+---+----+-------+-------+-------+-------+

对于id 3:
它的代码是055,056,057
值_1具有​​所有三个代码都是100,所以最大值是100,但是对于所有三个代码都重复,我不能选择一个代码。
必须计算值_2列,该列具有​​每种代码分别为50、10和20
所以三个代码中的最大值是50,并且是唯一的。
已选择id为3、代码为055的记录
请帮忙。

relj7zay

relj7zay1#

如果数据的格式保证算法始终选择一列,则以下代码将生成预期结果:

val w = Window.partitionBy("id")

var df2 = df;
val cols = Seq("value_1", "value_2", "value_3", "value_4")
for( col <- cols) {
  df2 = df2.withColumn(s"${col}_max", max(col).over(w))
    .withColumn(s"${col}_avg", avg(col).over(w))
}

var sel = ""
for( col <- cols) {
  sel += s"(${col}_max <> ${col}_avg and ${col} = ${col}_max) or"
}
sel.dropRight(2)

df2.filter(sel).select("id", ("code" +: cols):_*).sort("id", "code").show
brjng4g3

brjng4g32#

您可以将值\u1到4放在一个结构中,并使用window对其groupedby id列调用max函数

scala> df.show
+---+----+-------+-------+-------+-------+
| id|code|value_1|value_2|value_3|Value_4|
+---+----+-------+-------+-------+-------+
|  1| 051|      0|      0|     10|      0|
|  1| 052|      0|      0|      0|      0|
|  2| 053|     10|      0|     10|      0|
|  2| 054|      0|      0|     10|      0|
|  3| 055|    100|     50|      0|      0|
|  3| 056|    100|     10|      0|      0|
|  3| 057|    100|     20|      0|      0|
|  4| 058|     70|     15|      0|      0|
|  4| 059|     70|     15|      0|     20|
|  4| 060|     70|     15|      0|      0|
+---+----+-------+-------+-------+-------+

scala> val dfWithVals = df.withColumn("values", struct($"value_1", $"value_2", $"value_3", $"value_4"))
dfWithVals: org.apache.spark.sql.DataFrame = [id: int, code: string ... 5 more fields]

scala> dfWithVals.show
+---+----+-------+-------+-------+-------+---------------+
| id|code|value_1|value_2|value_3|Value_4|         values|
+---+----+-------+-------+-------+-------+---------------+
|  1| 051|      0|      0|     10|      0|  [0, 0, 10, 0]|
|  1| 052|      0|      0|      0|      0|   [0, 0, 0, 0]|
|  2| 053|     10|      0|     10|      0| [10, 0, 10, 0]|
|  2| 054|      0|      0|     10|      0|  [0, 0, 10, 0]|
|  3| 055|    100|     50|      0|      0|[100, 50, 0, 0]|
|  3| 056|    100|     10|      0|      0|[100, 10, 0, 0]|
|  3| 057|    100|     20|      0|      0|[100, 20, 0, 0]|
|  4| 058|     70|     15|      0|      0| [70, 15, 0, 0]|
|  4| 059|     70|     15|      0|     20|[70, 15, 0, 20]|
|  4| 060|     70|     15|      0|      0| [70, 15, 0, 0]|
+---+----+-------+-------+-------+-------+---------------+

scala> val overColumns =org.apache.spark.sql.expressions.Window.partitionBy("id")
overColumns: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec@de0daca

scala> dfWithVals.withColumn("maxvals", max($"values").over(overColumns)).filter($"values" === $"maxvals").show
+---+----+-------+-------+-------+-------+---------------+---------------+      
| id|code|value_1|value_2|value_3|Value_4|         values|        maxvals|
+---+----+-------+-------+-------+-------+---------------+---------------+
|  1| 051|      0|      0|     10|      0|  [0, 0, 10, 0]|  [0, 0, 10, 0]|
|  3| 055|    100|     50|      0|      0|[100, 50, 0, 0]|[100, 50, 0, 0]|
|  4| 059|     70|     15|      0|     20|[70, 15, 0, 20]|[70, 15, 0, 20]|
|  2| 053|     10|      0|     10|      0| [10, 0, 10, 0]| [10, 0, 10, 0]|
+---+----+-------+-------+-------+-------+---------------+---------------+

scala> dfWithVals.withColumn("maxvals", max($"values").over(overColumns)).filter($"values" === $"maxvals").drop("values", "maxvals").show
+---+----+-------+-------+-------+-------+                                      
| id|code|value_1|value_2|value_3|Value_4|
+---+----+-------+-------+-------+-------+
|  1| 051|      0|      0|     10|      0|
|  3| 055|    100|     50|      0|      0|
|  4| 059|     70|     15|      0|     20|
|  2| 053|     10|      0|     10|      0|
+---+----+-------+-------+-------+-------+

相关问题