hive查询查找中间几周的计数

qkf9rpyu  于 2021-05-29  发布在  Hadoop
关注(0)|答案(2)|浏览(352)

我有一张像下面这样的table

id      week    count   
A100    201008  2    
A100    201009  9    
A100    201010  16    
A100    201011  23    
A100    201012  30    
A100    201013  36    
A100    201015  43    
A100    201017  50    
A100    201018  57    
A100    201019  63    
A100    201023  70    
A100    201024  82    
A100    201025  88    
A100    201026  95    
A100    201027  102

在这里,我们可以看到,以下几周是失踪的:
第一个201014不见了
第二个201016不见了
第三周失踪201020201021201022
我的要求是,每当我们缺少值时,我们需要显示前一周的计数。
在这种情况下,输出应为:

id      week    count
A100    201008  2    
A100    201009  9    
A100    201010  16    
A100    201011  23    
A100    201012  30   
A100    201013  36    
A100    201014  36    
A100    201015  43    
A100    201016  43    
A100    201017  50    
A100    201018  57    
A100    201019  63    
A100    201020  63
A100    201021  63    
A100    201022  63    
A100    201023  70    
A100    201024  82    
A100    201025  88    
A100    201026  95    
A100    201027  102

如何使用hive/pyspark实现此要求?

tyky79it

tyky79it1#

Pypark溶液
样本数据

df = spark.createDataFrame([(1,201901,10),
                            (1,201903,9),
                            (1,201904,21),
                            (1,201906,42),
                            (1,201909,3),
                            (1,201912,56)
                           ],['id','weeknum','val'])
df.show()
+---+-------+---+
| id|weeknum|val|
+---+-------+---+
|  1| 201901| 10|
|  1| 201903|  9|
|  1| 201904| 21|
|  1| 201906| 42|
|  1| 201909|  3|
|  1| 201912| 56|
+---+-------+---+

1) 其基本思想是创建一个具有 cross join .

from pyspark.sql.functions import min,max,sum,when
from pyspark.sql import Window
min_max_week = df.agg(min(df.weeknum),max(df.weeknum)).collect()

# Generate all weeks using range

all_weeks = spark.range(min_max_week[0][0],min_max_week[0][1]+1)
all_weeks = all_weeks.withColumnRenamed('id','weekno')

# all_weeks.show()

id_all_weeks = df.select(df.id).distinct().crossJoin(all_weeks).withColumnRenamed('id','aid')

# id_all_weeks.show()

2) 此后, left join 将原始DataframeMap到这些组合有助于识别缺少的值。

res = id_all_weeks.join(df,(df.id == id_all_weeks.aid) & (df.weeknum == id_all_weeks.weekno),'left')
res.show()
+---+------+----+-------+----+
|aid|weekno|  id|weeknum| val|
+---+------+----+-------+----+
|  1|201911|null|   null|null|
|  1|201905|null|   null|null|
|  1|201903|   1| 201903|   9|
|  1|201904|   1| 201904|  21|
|  1|201901|   1| 201901|  10|
|  1|201906|   1| 201906|  42|
|  1|201908|null|   null|null|
|  1|201910|null|   null|null|
|  1|201912|   1| 201912|  56|
|  1|201907|null|   null|null|
|  1|201902|null|   null|null|
|  1|201909|   1| 201909|   3|
+---+------+----+-------+----+

3) 然后,结合使用窗口函数, sum ->分配组和 max ->在分组后填写缺失值。

w1 = Window.partitionBy(res.aid).orderBy(res.weekno)
groups = res.withColumn("grp",sum(when(res.id.isNull(),0).otherwise(1)).over(w1))
w2 = Window.partitionBy(groups.aid,groups.grp)
missing_values_filled = groups.withColumn('filled',max(groups.val).over(w2)) #select required columns as needed
missing_values_filled.show() 

+---+------+----+-------+----+---+------+
|aid|weekno|  id|weeknum| val|grp|filled|
+---+------+----+-------+----+---+------+
|  1|201901|   1| 201901|  10|  1|    10|
|  1|201902|null|   null|null|  1|    10|
|  1|201903|   1| 201903|   9|  2|     9|
|  1|201904|   1| 201904|  21|  3|    21|
|  1|201905|null|   null|null|  3|    21|
|  1|201906|   1| 201906|  42|  4|    42|
|  1|201907|null|   null|null|  4|    42|
|  1|201908|null|   null|null|  4|    42|
|  1|201909|   1| 201909|   3|  5|     3|
|  1|201910|null|   null|null|  5|     3|
|  1|201911|null|   null|null|  5|     3|
|  1|201912|   1| 201912|  56|  6|    56|
+---+------+----+-------+----+---+------+

具有与上述相同逻辑的配置单元查询(假设可以创建包含所有周的表)

select id,weeknum,max(val) over(partition by id,grp) as val
from (select i.id
            ,w.weeknum
            ,t.val
            ,sum(case when t.id is null then 0 else 1 end) over(partition by i.id order by w.weeknum) as grp 
      from (select distinct id from tbl) i
      cross join weeks_table w
      left join tbl t on t.id = i.id and w.weeknum = t.weeknum
     ) t
nnt7mjpx

nnt7mjpx2#

尽管这个答案是正确的 Scala ,python版本看起来几乎一样&可以很容易地转换。
第一步:
查找之前缺少周值的行。
样本输入:

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

//sample input
val input = sc.parallelize(List(("A100",201008,2), ("A100",201009,9),("A100",201014,4), ("A100",201016,45))).toDF("id","week","count")

scala> input.show
+----+------+-----+
|  id|  week|count|
+----+------+-----+
|A100|201008|    2|
|A100|201009|    9|
|A100|201014|    4| //missing 4 rows
|A100|201016|   45| //missing 1 row
+----+------+-----+

为了找到它,我们可以用 .lead() 功能开启 week . 并计算出 leadWeek 以及 week . 差异不应大于1,如果是这样,则前面缺少行。

val diffDF = input
  .withColumn("leadWeek", lead($"week", 1).over(Window.partitionBy($"id").orderBy($"week")))   // partitioning by id & computing lead()
  .withColumn("diff", ($"leadWeek" - $"week") -1)                                 // finding difference between leadWeek & week

scala> diffDF.show
+----+------+-----+--------+----+
|  id|  week|count|leadWeek|diff|
+----+------+-----+--------+----+
|A100|201008|    2|  201009|   0| // diff -> 0 represents that no rows needs to be added
|A100|201009|    9|  201014|   4| // diff -> 4 represents 4 rows are to be added after this row.
|A100|201014|    4|  201016|   1| // diff -> 1 represents 1 row to be added after this row.
|A100|201016|   45|    null|null|
+----+------+-----+--------+----+

第二步:
如果差异大于等于1:创建并添加n行( InputWithDiff ,检查下面的case类) diff 和增量 week 相应地估价。返回新创建的行以及原始行。
如果diff为0,则不需要额外计算。按原样返回原始行。
转换 diffDF 以便于计算。

case class InputWithDiff(id: Option[String], week: Option[Int], count: Option[Int], leadWeek: Option[Int], diff: Option[Int])

val diffDS = diffDF.as[InputWithDiff]

val output = diffDS.flatMap(x => {
 val diff = x.diff.getOrElse(0) 

 diff match {
  case n if n >= 1 => x :: (1 to diff).map(y => InputWithDiff(x.id, Some(x.week.get + y), x.count,x.leadWeek, x.diff)).toList  // create and append new Rows
  case _ => List(x)      // return as it is
 }
}).drop("leadWeek", "diff").toDF   // drop unnecessary columns & convert to DF

最终输出:

scala> output.show
+----+------+-----+
|  id|  week|count|
+----+------+-----+
|A100|201008|    2|
|A100|201009|    9|
|A100|201010|    9|
|A100|201011|    9|
|A100|201012|    9|
|A100|201013|    9|
|A100|201014|    4|
|A100|201015|    4|
|A100|201016|   45|
+----+------+-----+

相关问题