根据不同Dataframe的间隔查找Dataframe中的最大值

b1uwtaje  于 2021-07-12  发布在  Spark
关注(0)|答案(1)|浏览(341)

我有两个Dataframe,一个间隔15分钟,另一个间隔15分钟 starttime ,安 endtime ,和一个值。我想找到马克斯 value 属于第一个Dataframe间隔内的第二个Dataframe的。
两个Dataframe的架构:

DF1 Schema
 |-- start: timestamp (nullable = false)
 |-- end: timestamp (nullable = false)

 DF2 Schema
 |-- starttime: timestamp (nullable = false)
 |-- endtime: timestamp (nullable = false)
 |-- value: Long(nullable = false)

我已经创建了这个解决方案,尽管我担心它的性能。我想知道是否有更好的方法来实现这一点,而不循环。我想加入,但因为我需要找到最大的 df2 在…的间隔内 df1 我不确定我会加入什么。

case class maxCaseClass(starttime:ZonedDateTime, endtime:ZonedDateTime, max: Long)
var maxInInterval = Seq.newBuilder[maxCaseClass]
val distinctIntervals = df1.select("start", "end").distinct().collect()
distinctIntervals.foreach(row => {
  val starttime = row.getAs("start").asInstanceOf[Timestamp]
  val endtime = row.getAs("end").asInstanceOf[Timestamp]
  val maxDF = df2.filter(col("endtime") >= lit(starttime).cast(TimestampType) && col("starttime") <= lit(endtime).cast(TimestampType)).agg(max("value").as("max"))
  maxInInterval += maxCaseClass(
                   LocalDateTime.parse(starttime.toString).atZone(ZoneOffset.UTC), 
                   LocalDateTime.parse(endtime.toString).atZone(ZoneOffset.UTC), 
                   maxDF.head().getAs("max").asInstanceOf[Long]
                   )
})

与其以一个序列结束,我只想在其中添加一个新列 df1maxValue ,但我不知道如何做到这一点。

axzmvihb

axzmvihb1#

您可以在该间隔条件下将df1与df2连接起来,然后进行聚合:

val result = DF1.join(
  DF2,
  (col("end") >= col("endtime")) && (col("starttime") >= col("start"))
).groupBy("start", "end")
.agg(max("value").as("max_value"))

或将sql与相关子查询一起使用:

DF1.createOrReplaceTempView("df1")
DF2.createOrReplaceTempView("df2")

val result = spark.sql("""
select  *,
        (select max(value) from df2 where end >= endtime and starttime >= start) as max_value
from  df1
""")

相关问题