我有一个要求加入2个数据集的基础上关键和版本列。
case class Sample(id: String, version: Int, recordValue: String)
val left = Seq(
Sample("1", 5, "left5"),
Sample("1", 6, "left6")
).toDS()
val right = Seq(
Sample("1", 6, "right6"),
Sample("1", 5, "right5"),
Sample("1", 3, "right3"),
Sample("1", 2, "right2"),
Sample("1", 1, "right1")
).toDS()
val result = left.jointWith(right,
left("id") === right("id") &&
left("version") === (right("version")+1),
"left_outer"
)
字符串
我必须将左DS与右DS连接起来,这样我就有了一个<current_version,previous_version>的元组,如下所述。如果右侧不存在以前的版本,那么我需要获取下一个以前的版本。例如:对于版本6(在左侧,我需要获取右侧的版本5
<version_6,version_5> <version_5,version_3>
当加入左侧的版本5时,我需要获得右侧的版本3,因为版本4不存在。如果上一个版本不存在,则上述连接无法获得下一个版本。
输出应如下所示
result.show()
result
================================
( (Sample("1", 6, "left6"), (Sample("1", 5, "right5"))
( (Sample("1", 5, "left5"), (Sample("1", 3, "right3"))
型
谢谢
1条答案
按热度按时间x33g5p2x1#
您可以
left-join
相等id
和left("version") > right("version")
的数据集,然后是groupBy
和max_by
作为聚合函数。字符串
请注意,
max_by
仅在Spark 3.3+
上可用。