scala 基于列连接2个数据集,如果不存在,则检查下一个值

vpfxa7rd  于 7个月前  发布在  Scala
关注(0)|答案(1)|浏览(83)

我有一个要求加入2个数据集的基础上关键和版本列。

case class Sample(id: String, version: Int, recordValue: String)

val left = Seq(
     Sample("1", 5, "left5"),
     Sample("1", 6, "left6")
   ).toDS()

val right = Seq(
     Sample("1", 6, "right6"),
     Sample("1", 5, "right5"),
     Sample("1", 3, "right3"),
     Sample("1", 2, "right2"),
     Sample("1", 1, "right1")
   ).toDS()

val result = left.jointWith(right,
    left("id") === right("id") &&
    left("version") === (right("version")+1),
    "left_outer"
    )

字符串
我必须将左DS与右DS连接起来,这样我就有了一个<current_version,previous_version>的元组,如下所述。如果右侧不存在以前的版本,那么我需要获取下一个以前的版本。例如:对于版本6(在左侧,我需要获取右侧的版本5
<version_6,version_5> <version_5,version_3>
当加入左侧的版本5时,我需要获得右侧的版本3,因为版本4不存在。如果上一个版本不存在,则上述连接无法获得下一个版本。
输出应如下所示

result.show()

result
================================
( (Sample("1", 6, "left6"), (Sample("1", 5, "right5"))
( (Sample("1", 5, "left5"), (Sample("1", 3, "right3"))


谢谢

x33g5p2x

x33g5p2x1#

您可以left-join相等idleft("version") > right("version")的数据集,然后是groupBymax_by作为聚合函数。

val result = left.joinWith(
    right,
    left("id") === right("id") && left("version") > right("version"),
    "left_outer"
  ).
  groupBy($"_1".as("curr")).agg(max_by($"_2", $"_2.version").as("prev"))

result.show
// +-------------+--------------+
// |         curr|          prev|
// +-------------+--------------+
// |{1, 5, left5}|{1, 3, right3}|
// |{1, 6, left6}|{1, 5, right5}|
// +-------------+--------------+

字符串
请注意,max_by仅在Spark 3.3+上可用。

相关问题