获取列中几组年的统计信息(descripe()-like)

3lxsmp7m  于 2021-05-16  发布在  Spark
关注(0)|答案(1)|浏览(369)

从类似日期(年)的列中,我得到以下类型的值:

+--------------------+------------+
|         CUSTOMER_ID|yearSelected|
+--------------------+------------+
|1                   |        2010|
|2                   |        1992|
|3                   |        1996|
|4                   |        1990|
|5                   |        1984|
+--------------------+------------+

现在我需要每五年对它们进行一次分组(1990年到1994年:第1组,1995年到1996年:第2组),依此类推,对于每个组,获取信息就像我在做df.descripe()
到目前为止我尝试了什么:

df8 = df4.groupBy('yearSelected')
stat_col = conf['by']

output = df8.agg(
    F.count(stat_col).alias("count"),
    F.mean(stat_col).alias("mean"),
    F.min(stat_col).cast(DecimalType(36,2)).alias("min"),
    F.max(stat_col).cast(DecimalType(36,2)).alias("max"),
    F.sum((F.col(stat_col) > 0).cast(DecimalType(36,2))).alias("greaterThan0"),
    F.sum((F.col(stat_col) == 0).cast(DoubleType())).alias("equalTo0"),
    F.sum((F.col(stat_col) < 0).cast(DecimalType(36,2))).alias("lesserThan0"),
).toPandas()

这个问题缺少的是按年份范围分组,这是我还没有解决的问题。
另一个想法是使用windows,但我失败了:

windowSpecAgg  = Window.partitionBy("yearSelected").orderBy("yearSelected")
df5 = df4.withColumn("row",row_number().over(windowSpecAgg)) \
  .withColumn("avg", avg(col(conf['by'])).over(windowSpecAgg)) \
  .withColumn("sum", sum(col(conf['by'])).over(windowSpecAgg)) \
  .withColumn("min", min(col(conf['by'])).over(windowSpecAgg)) \
  .withColumn("max", max(col(conf['by'])).over(windowSpecAgg)) \
  .where(col("row")==1).select("yearSelected","avg","sum","min","max")

我失败的原因有两个,第一个是我无法正确地完成这个想法并将其转化为代码,第二个是我尝试为此Dataframe(而不是类似的Dataframe)执行的每个操作(show()、descripe()、count())都会得到:

py4j.protocol.Py4JJavaError: An error occurred while calling o2323.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 8 in stage 47.0 failed 1 times, most recent failure: Lost task 8.0 in stage 47.0 (TID 692, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last)

...
TypeError: strptime() argument 1 must be str, not None

    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
    at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:81)

如果dataframe有用的话,我就是这样构建它的(正如我之前所说的,我这样做是为了从数据框中定义的列中获取年份) conf[by] :

funcDateTransf =  udf(lambda x: datetime.strptime(x, '%Y%m%d'), DateType())
df1 = df.withColumn('dateFormat', date_format(funcDateTransf(col(conf['by'])), 'MM-dd-yyy'))
df1 = df1.withColumn('date_in_dateFormat', 
               to_date(unix_timestamp(col('dateFormat'), 'MM-dd-yyyy').cast("timestamp")))
df3 = df1.select(conf['columnaJoin'],year('date_in_dateFormat').alias('yearSelected'))

# df3.show(5)

df4 = df1.join(df3, df1[conf['columnaJoin']] == df3[conf['columnaJoin']], 'inner')

我不明白为什么这种创建Dataframe的特殊方式会给我带来麻烦,而不是当我不创建这个额外的列时,但这正是我现在观察到的。

8wtpewkr

8wtpewkr1#

从我对你的问题的理解来看,你正在为如何将每个年份划分为5年的范围而挣扎。假设您的数据集被称为 dataDF . 现在,为了将您的个人年份按5年分组,您可以使用:

from pyspark.sql import functions as F

dataDF.show()
+--------------------+------------+
|         CUSTOMER_ID|yearSelected|
+--------------------+------------+
|1                   |        2010|
|2                   |        1992|
|3                   |        1996|
|4                   |        1990|
|5                   |        1984|
+--------------------+------------+

dataDF.withColumn("new_column",
       F.when((F.col("yearSelected") >= 1980) & (F.col("yearSelected") <= 1984), "Group 1"), \
      .when((F.col("yearSelected") >= 1985) & (F.col("yearSelected") <= 1989), "Group 2"), \
      .when((F.col("yearSelected") >= 1990) & (F.col("yearSelected") <= 1994), "Group 3"), \
      .when((F.col("yearSelected") >= 1995) & (F.col("yearSelected") <= 1999), "Group 4"), \
      .when((F.col("yearSelected") >= 2000) & (F.col("yearSelected") <= 2004), "Group 5"), \
      .when((F.col("yearSelected") >= 2005) & (F.col("yearSelected") <= 2009), "Group 6"), \
      .when((F.col("yearSelected") >= 2010) & (F.col("yearSelected") <= 2014), "Group 7"), \
      .otherwise("N/A")).show()

结果数据集将是:

+--------------------+------------+-------------+
|         CUSTOMER_ID|yearSelected| new_column  |
+--------------------+------------+-------------+
|1                   |        2010|   Group 7   |
|2                   |        1992|   Group 3   |
|3                   |        1996|   Group 4   |
|4                   |        1990|   Group 3   |
|5                   |        1984|   Group 1   |
+--------------------+------------+-------------+

从这里,您可以按 new_column 并获得你提到的统计数据。
这样做的缺点是您必须手动配置要选择的范围。然而,从你的例子来看,我猜你并没有走得太远,所以这应该是可行的:)

相关问题