我有两个Dataframe,一个间隔15分钟,另一个间隔15分钟 starttime
,安 endtime
,和一个值。我想找到马克斯 value
属于第一个Dataframe间隔内的第二个Dataframe的。
两个Dataframe的架构:
DF1 Schema
|-- start: timestamp (nullable = false)
|-- end: timestamp (nullable = false)
DF2 Schema
|-- starttime: timestamp (nullable = false)
|-- endtime: timestamp (nullable = false)
|-- value: Long(nullable = false)
我已经创建了这个解决方案,尽管我担心它的性能。我想知道是否有更好的方法来实现这一点,而不循环。我想加入,但因为我需要找到最大的 df2
在…的间隔内 df1
我不确定我会加入什么。
case class maxCaseClass(starttime:ZonedDateTime, endtime:ZonedDateTime, max: Long)
var maxInInterval = Seq.newBuilder[maxCaseClass]
val distinctIntervals = df1.select("start", "end").distinct().collect()
distinctIntervals.foreach(row => {
val starttime = row.getAs("start").asInstanceOf[Timestamp]
val endtime = row.getAs("end").asInstanceOf[Timestamp]
val maxDF = df2.filter(col("endtime") >= lit(starttime).cast(TimestampType) && col("starttime") <= lit(endtime).cast(TimestampType)).agg(max("value").as("max"))
maxInInterval += maxCaseClass(
LocalDateTime.parse(starttime.toString).atZone(ZoneOffset.UTC),
LocalDateTime.parse(endtime.toString).atZone(ZoneOffset.UTC),
maxDF.head().getAs("max").asInstanceOf[Long]
)
})
与其以一个序列结束,我只想在其中添加一个新列 df1
与 maxValue
,但我不知道如何做到这一点。
1条答案
按热度按时间axzmvihb1#
您可以在该间隔条件下将df1与df2连接起来,然后进行聚合:
或将sql与相关子查询一起使用: