sql—如何比较两个结构相同的Dataframe来计算行差异

hfsqlsce  于 2021-05-27  发布在  Hadoop
关注(0)|答案(3)|浏览(376)

我有以下两个相同结构的Dataframe,它们的id相同。

val originalDF = Seq((1,"gaurav","jaipur",550,70000),(2,"sunil","noida",600,80000),(3,"rishi","ahmedabad",510,65000))
                .toDF("id","name","city","credit_score","credit_limit")
scala> originalDF.show(false)
+---+------+---------+------------+------------+
|id |name  |city     |credit_score|credit_limit|
+---+------+---------+------------+------------+
|1  |gaurav|jaipur   |550         |70000       |
|2  |sunil |noida    |600         |80000       |
|3  |rishi |ahmedabad|510         |65000       |
+---+------+---------+------------+------------+
val changedDF= Seq((1,"gaurav","jaipur",550,70000),(2,"sunil","noida",650,90000),(4,"Joshua","cochin",612,85000))
                .toDF("id","name","city","credit_score","credit_limit")
scala> changedDF.show(false)
+---+------+------+------------+------------+
|id |name  |city  |credit_score|credit_limit|
+---+------+------+------------+------------+
|1  |gaurav|jaipur|550         |70000       |
|2  |sunil |noida |650         |90000       |
|4  |Joshua|cochin|612         |85000       |
+---+------+------+------------+------------+

因此,我编写了一个udf来计算列值的变化。

val diff = udf((col: String, c1: String, c2: String) => if (c1 == c2) "" else col )
val somedf=changedDF.alias("a").join(originalDF.alias("b"), col("a.id") === col("b.id")).withColumn("diffcolumn", split(concat_ws(",",changedDF.columns.map(x => diff(lit(x), changedDF(x), originalDF(x))):_*),","))
scala> somedf.show(false)
+---+------+------+------------+------------+---+------+------+------------+------------+----------------------------------+
|id |name  |city  |credit_score|credit_limit|id |name  |city  |credit_score|credit_limit|diffcolumn                        |
+---+------+------+------------+------------+---+------+------+------------+------------+----------------------------------+
|1  |gaurav|jaipur|550         |70000       |1  |gaurav|jaipur|550         |70000       |[, , , , ]                        |
|2  |sunil |noida |650         |90000       |2  |sunil |noida |600         |80000       |[, , , credit_score, credit_limit]|
+---+------+------+------------+------------+---+------+------+------------+------------+----------------------------------+

但我无法分别获取id和diffcolumn。如果我做一个somedf.select('id),它会给我一个模糊性错误,因为在联接表中有两个id,我想得到任何数组中所有列的名称和值已更改的id。像在changeddf信用评分和信用限额id=2中一样,name=sunil已经更改。因此,我希望生成的Dataframe给我这样的结果

+--+---+------+------+------------+------------+---+
|id   | diffcolumn                        |         
+---+------+------+------------+------------+---
|2   |[, , , credit_score, credit_limit] |
+---+------+------+------------+------------+---+

有人能告诉我在Dataframe中分别获取eh id和changed列的方法吗。

oknwwptz

oknwwptz1#

把你的加入条件从 col("a.id") === col("b.id")"id" 那么,就只有一个 id 列。
而且,你不需要 alias("a") 以及 alias("b") . 所以你的加入从

changedDF.alias("a").join(originalDF.alias("b"), col("a.id") === col("b.id"))

changedDF.join(originalDF, "id")
b4lqfgs4

b4lqfgs42#

供您参考,这些类型的差异可以很容易地完成与Spark扩展包。它提供了 diff 为您构建复杂查询的转换:

import uk.co.gresearch.spark.diff._

val options = DiffOptions.default.withChangeColumn("changes")  // needed to get the optional 'changes' column
val diff = originalDF.diff(changedDF, options, "id")

diff.show(false)
+----+----------------------------+---+---------+----------+---------+----------+-----------------+------------------+-----------------+------------------+
|diff|changes                     |id |left_name|right_name|left_city|right_city|left_credit_score|right_credit_score|left_credit_limit|right_credit_limit|
+----+----------------------------+---+---------+----------+---------+----------+-----------------+------------------+-----------------+------------------+
|N   |[]                          |1  |gaurav   |gaurav    |jaipur   |jaipur    |550              |550               |70000            |70000             |
|I   |null                        |4  |null     |Joshua    |null     |cochin    |null             |612               |null             |85000             |
|C   |[credit_score, credit_limit]|2  |sunil    |sunil     |noida    |noida     |600              |650               |80000            |90000             |
|D   |null                        |3  |rishi    |null      |ahmedabad|null      |510              |null              |65000            |null              |
+----+----------------------------+---+---------+----------+---------+----------+-----------------+------------------+-----------------+------------------+

diff.select($"id", $"diff", $"changes").show(false)
+---+----+----------------------------+
|id |diff|changes                     |
+---+----+----------------------------+
|1  |N   |[]                          |
|4  |I   |null                        |
|2  |C   |[credit_score, credit_limit]|
|3  |D   |null                        |
+---+----+----------------------------+

虽然这是一个简单的例子,但当涉及宽模式和空值时,差异Dataframe可能会变得复杂。这个包经过了很好的测试,所以您不必担心自己是否正确地处理了这个查询。

e0bqpujr

e0bqpujr3#

试试这个:

val aliasedChangedDF = changedDF.as("a")
    val aliasedOriginalDF = originalDF.as("b")
    val diff = udf((col: String, c1: String, c2: String) => if (c1 == c2) "" else col )
    val somedf=aliasedChangedDF.join(aliasedOriginalDF, col("a.id") === col("b.id")).withColumn("diffcolumn", split(concat_ws(",",changedDF.columns.map(x => diff(lit(x), changedDF(x), originalDF(x))):_*),","))
    somedf.select(col("a.id").as("id"),col("diffcolumn"))

相关问题