sql—如何在特定时间范围内从sparkDataframe中选择最大值

a8jjtwal  于 2021-05-27  发布在  Spark
关注(0)|答案(0)|浏览(296)

我有一些货物的记录。具有列的记录集 container_no , origin , destination , shipment_dt 以及 volume .
有多个记录具有相同的 container_no 有可能同一个集装箱在不同的日期装运,但如果 shipment_dt 在时间跨度的10天内,然后检查 origincontainer_no ,如果所有具有不同来源的记录,则删除具有相同容器号且在10天时间跨度内的所有记录,否则选择具有最高卷的记录。
请注意:我们将根据第一次发生的事件来决定10天的时间跨度 container_no .
样本输入:

预期产量:

选择数据的起止条件:

我已经写了一个查询,以获得10天的时间范围,但不知道如何比较出发地和目的地,并获得最高数量的记录。
创建Dataframe的输入查询示例:

val Input_DF = spark.sql("""
    SELECT '12345' AS container_no , 'India' AS origin ,'China' AS destination , '2020-10-10' AS shipment_dt , 20 as volume UNION
    SELECT '12345' AS container_no , 'India' AS origin ,'China' AS destination , '2020-10-10' AS shipment_dt , 30 as volume UNION
    SELECT '12345' AS container_no , 'India' AS origin ,'China' AS destination , '2020-10-12' AS shipment_dt , 10 as volume UNION
    SELECT '12345' AS container_no , 'India' AS origin ,'China' AS destination , '2020-10-25' AS shipment_dt , 20 as volume UNION
    SELECT '12345' AS container_no , 'India' AS origin ,'China' AS destination , '2020-10-26' AS shipment_dt , 10 as volume UNION
    SELECT '12346' AS container_no , 'India' AS origin ,'China' AS destination , '2020-10-15' AS shipment_dt , 20 as volume UNION
    SELECT '12346' AS container_no , 'India' AS origin ,'China' AS destination , '2020-10-16' AS shipment_dt , 20 as volume UNION
    SELECT '12346' AS container_no , 'India' AS origin ,'China' AS destination , '2020-10-17' AS shipment_dt , 50 as volume UNION
    SELECT '12347' AS container_no , 'India' AS origin ,'China' AS destination , '2020-10-18' AS shipment_dt , 20 as volume UNION
    SELECT '12347' AS container_no , 'Nepal' AS origin ,'China' AS destination , '2020-10-19' AS shipment_dt , 21 as volume""")

Input_DF.createOrReplaceTempView("Input_DF")

查询以创建数据的10天时间范围:

val output_df = spark.sql("""
            SELECT
                      B.* ,
                      CASE
                          WHEN from_prev BETWEEN 0 AND 9
                          THEN 1
                          ELSE 0
                      END                     AS recent ,
                      floor(from_first / 10 ) AS recent_group
                  FROM
                      (
                          SELECT
                              A.*,
                              NVL(DATEDIFF(shipment_dt,FIRST(shipment_dt) over(partition BY container_no
                              ORDER BY shipment_dt ASC)) ,0) AS from_first,
                              NVL(DATEDIFF(shipment_dt,lag(shipment_dt,1) over(partition BY container_no
                              ORDER BY shipment_dt ASC)) ,0) from_prev
                          FROM
                              Input_DF A) B 
                  ORDER BY
                    container_no,
                    shipment_dt""")

在示例输入屏幕截图中,我添加了一个额外的列来解释一行与另一行中具有相同容器但日期不同的记录。提前谢谢。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题