聚合时间戳,pyspark中有1秒的差异

iecba09b  于 2021-05-29  发布在  Spark
关注(0)|答案(2)|浏览(352)

我有pyspark数据框,如下面的示例(原始数据每天有1.5条记录)。它包含用户数据,包括开始时间和结束时间列以及几个人口统计变量(id、年龄组、县等)。许多记录只有1秒的时差

+--------+-------------+---------+-----------------------+-------------------+---------+
|id      | date        | group   |start_time             | end_time          | duration|
+--------+-------------+---------+-----------------------+-------------------+---------+
|    78aa| 2020-04-14  | 3       |    2020-04-14 19:00:00|2020-04-14 19:23:59|24       |
|    78aa| 2020-04-14  | 3       |    2020-04-14 19:24:00|2020-04-14 19:26:59|4        |
|    78aa| 2020-04-14  | 3       |    2020-04-14 19:27:00|2020-04-14 19:35:59|8        |
|    78aa| 2020-04-14  | 3       |    2020-04-14 19:36:00|2020-04-14 19:55:00|19       |
|    25aa| 2020-04-15  | 7       |    2020-04-15 08:00:00|2020-04-15 08:02:59|3        |
|    25aa| 2020-04-15  | 7       |    2020-04-15 11:03:00|2020-04-15 11:11:59|9        |
|    25aa| 2020-04-15  | 7       |    2020-04-15 11:12:00|2020-04-15 11:45:59|34       |
|    25aa| 2020-04-15  | 7       |    2020-04-15 11:46:00|2020-04-15 11:47:00|1        |
+--------+-------+-----+---------+-----------------------+-------------------+---------+

我的尝试:全天聚合数据

from pyspark.sql.functions import sum, first

df = df.groupBy("date" , "id" ).agg(first("group"), sum("duration"))\
.toDF("data","id","group", "duration")

我还需要在白天传输用户聚合级别的Dataframe。我怎样用Pypark获得这个?我不想将数据转换成pandasDataframe,因为pandas会将数据加载到驱动程序的内存中,我将面临内存问题:这里是所需的输出

+--------+--------------+------+-----------------------+-------------------+---------+
|id      |  date        |group |start_time             | end_time          | duration|
+--------+--------------+------+-----------------------+-------------------+---------+
|    78aa|  2020-04-14  | 3    |    2020-04-14 19:00:00|2020-04-14 19:55:00|55       |
|    25aa|  2020-04-15  | 7    |    2020-04-15 08:00:00|2020-04-15 08:02:59|3        |
|    25aa|  2020-04-15  | 7    |    2020-04-15 11:00:00|2020-04-15 11:47:00|44       |
+--------+--------------+------+-----------------------+-------------------+---------+
a9wyjsp7

a9wyjsp71#

import org.apache.spark.sql.functions._

    val df0 = Seq(
      ("78aa", "2020-04-14", 3, "2020-04-14 19:00:00", "2020-04-14 19:23:59", 24),
      ("78aa", "2020-04-14", 3, "2020-04-14 19:24:00", "2020-04-14 19:26:59", 4),
      ("78aa", "2020-04-14", 3, "2020-04-14 19:27:00", "2020-04-14 19:35:59", 8),
      ("78aa", "2020-04-14", 3, "2020-04-14 19:36:00", "2020-04-14 19:55:00", 19),
      ("25aa", "2020-04-15", 7, "2020-04-15 08:00:00", "2020-04-15 08:02:59", 3),
      ("25aa", "2020-04-15", 7, "2020-04-15 11:03:00", "2020-04-15 11:11:59", 9),
      ("25aa", "2020-04-15", 7, "2020-04-15 11:12:00", "2020-04-15 11:45:59", 34),
      ("25aa", "2020-04-15", 7, "2020-04-15 11:46:00", "2020-04-15 11:47:00", 1)
    ).toDF("id", "date", "group", "start_time", "end_time", "duration")

    val df1 = df0.withColumn("start_time_1", date_format('start_time, "YYYY-MM-dd HH"))

    df1.show(false)

    val res = df1.groupBy("id", "date", "group", "start_time_1")
      .agg(min('start_time).alias("start_time"), max('end_time).alias("end_time"), sum('duration).alias("duration"))
      .orderBy('start_time.asc)
      .drop("start_time_1")

    res.show(false)
//    +----+----------+-----+-------------------+-------------------+--------+
//    |id  |date      |group|start_time         |end_time           |duration|
//    +----+----------+-----+-------------------+-------------------+--------+
//    |78aa|2020-04-14|3    |2020-04-14 19:00:00|2020-04-14 19:55:00|55      |
//    |25aa|2020-04-15|7    |2020-04-15 08:00:00|2020-04-15 08:02:59|3       |
//    |25aa|2020-04-15|7    |2020-04-15 11:03:00|2020-04-15 11:47:00|44      |
//    +----+----------+-----+-------------------+-------------------+--------+
qhhrdooz

qhhrdooz2#

试试这个。需要创建一个 additional columngroup the timings 他们在哪里 succeed each other by 1 second 使用窗口函数。

from pyspark.sql import functions as F
from pyspark.sql.window import Window

w=Window().partitionBy("id","date","group").orderBy("start_time")
df.withColumn("check", F.sum(F.when(F.unix_timestamp("start_time")-F.lag(F.unix_timestamp("end_time")).over(w)>1,F.lit(1))\
                        .otherwise(F.lit(0))).over(w))\
  .groupBy("date","id","group","check").agg(F.first("start_time").alias("start_time"),F.last("end_time").alias("end_time"),\
                                   F.sum("duration").alias("duration")).drop("check").show()

# +----------+----+-----+-------------------+-------------------+--------+

# |      date|  id|group|         start_time|           end_time|duration|

# +----------+----+-----+-------------------+-------------------+--------+

# |2020-04-14|78aa|    3|2020-04-14 19:00:00|2020-04-14 19:55:00|      55|

# |2020-04-15|25aa|    7|2020-04-15 08:00:00|2020-04-15 08:02:59|       3|

# |2020-04-15|25aa|    7|2020-04-15 11:03:00|2020-04-15 11:47:00|      44|

# +----------+----+-----+-------------------+-------------------+--------+

相关问题