计算sparkDataframe中24小时时间帧的总和

yws3nbqq  于 2021-05-27  发布在  Spark
关注(0)|答案(3)|浏览(433)

我想通过基于小时过滤行来计算date和date+1(24小时)的总和。

1, 2018-05-01 02:12:00,1
1, 2018-05-01 03:16:10,2
1, 2018-05-01 09:12:00,4
1, 2018-05-01 14:18:00,3
1, 2018-05-01 18:32:00,1
1, 2018-05-01 20:12:00,1
1, 2018-05-02 01:22:00,1
1, 2018-05-02 02:12:00,1
1, 2018-05-02 08:30:00,1
1, 2018-05-02 10:12:00,1
1, 2018-05-02 11:32:00,1
1, 2018-05-02 18:12:00,1
1, 2018-05-03 03:12:00,1
1, 2018-05-03 08:22:00,1

在这里,我过滤了从9am到9am(下一个日期)输出的行

1, 2018-05-01,12
1, 2018-05-02,5
pgccezyw

pgccezyw1#

首先定义 df 再现性:

import pandas as pd
import io
data=\
"""
1, 2018-05-01 02:12:00,1
1, 2018-05-01 03:16:10,2
1, 2018-05-01 09:12:00,4
1, 2018-05-01 14:18:00,3
1, 2018-05-01 18:32:00,1
1, 2018-05-01 20:12:00,1
1, 2018-05-02 01:22:00,1
1, 2018-05-02 02:12:00,1
1, 2018-05-02 08:30:00,1
1, 2018-05-02 10:12:00,1
1, 2018-05-02 11:32:00,1
1, 2018-05-02 18:12:00,1
1, 2018-05-03 03:12:00,1
1, 2018-05-03 08:22:00,1
"""

df = pd.read_csv(io.StringIO(data), sep = ',', names = ['id','t', 'n'], parse_dates =['t'])

然后使用 pd.Grouper 频率设置为24小时 base 参数设置为9,表示时段在上午9点开始:

df.groupby(pd.Grouper(key='t', freq='24h', base=9)).n.sum()

结果:

t
2018-04-30 09:00:00     3
2018-05-01 09:00:00    12
2018-05-02 09:00:00     5
Freq: 24H, Name: n, dtype: int64
dgtucam1

dgtucam12#

只需将时间戳列的时间移动9小时,然后按调整列的日期分组:

from pyspark.sql.functions import expr, sum as fsum 

df

# DataFrame[id: int, dtime: timestamp, cnt: int]

df.groupby("id", expr("date(dtime - interval 9 hours) as ddate")) \
  .agg(fsum("cnt").alias("cnt")) \
  .show()
+---+----------+---+
| id|     ddate|cnt|
+---+----------+---+
|  1|2018-05-01| 12|
|  1|2018-05-02|  5|
|  1|2018-04-30|  3|
+---+----------+---+
unguejic

unguejic3#

使用 date_format(), date_add(),to_date() 然后 groupBy,aggregate spark内置功能。
Example: Spark-Scala: ```
df.show()
//+---+-------------------+---+
//| id| date|cnt|
//+---+-------------------+---+
//| 1|2018-05-01 02:12:00| 1|
//| 1|2018-05-01 03:16:10| 2|
//| 1|2018-05-01 09:12:00| 4|
//| 1|2018-05-01 14:18:00| 3|
//| 1|2018-05-01 18:32:00| 1|
//| 1|2018-05-01 20:12:00| 1|
//| 1|2018-05-02 01:22:00| 1|
//| 1|2018-05-02 02:12:00| 1|
//| 1|2018-05-02 08:30:00| 1|
//| 1|2018-05-02 10:12:00| 1|
//| 1|2018-05-02 11:32:00| 1|
//| 1|2018-05-02 18:12:00| 1|
//| 1|2018-05-03 03:12:00| 1|
//| 1|2018-05-03 08:22:00| 1|
//+---+-------------------+---+

df.withColumn("hour",when(date_format(col("date"),"HH").cast("int") >= 9,to_date(col("date"))).otherwise(date_add(to_date(col("date")),-1))).
groupBy("id","hour").
agg(sum("cnt").cast("int").alias("sum")).
show()
//+---+----------+---+
//| id| hour|sum|
//+---+----------+---+
//| 1|2018-05-01| 12|
//| 1|2018-05-02| 5|
//| 1|2018-04-30| 3|
//+---+----------+---+
`Pyspark:`
from pyspark.sql.functions import *
from pyspark.sql.types import *
df.withColumn("hour",when(date_format(col("date"),"HH").cast("int") >= 9,to_date(col("date"))).otherwise(date_add(to_date(col("date")),-1))).
groupBy("id","hour").
agg(sum("cnt").cast("int").alias("sum")).
show()

+---+----------+---+

| id| hour|sum|

+---+----------+---+

| 1|2018-05-01| 12|

| 1|2018-05-02| 5|

| 1|2018-04-30| 3|

+---+----------+---+

相关问题