从类似日期(年)的列中,我得到以下类型的值:
+--------------------+------------+
| CUSTOMER_ID|yearSelected|
+--------------------+------------+
|1 | 2010|
|2 | 1992|
|3 | 1996|
|4 | 1990|
|5 | 1984|
+--------------------+------------+
现在我需要每五年对它们进行一次分组(1990年到1994年:第1组,1995年到1996年:第2组),依此类推,对于每个组,获取信息就像我在做df.descripe()
到目前为止我尝试了什么:
df8 = df4.groupBy('yearSelected')
stat_col = conf['by']
output = df8.agg(
F.count(stat_col).alias("count"),
F.mean(stat_col).alias("mean"),
F.min(stat_col).cast(DecimalType(36,2)).alias("min"),
F.max(stat_col).cast(DecimalType(36,2)).alias("max"),
F.sum((F.col(stat_col) > 0).cast(DecimalType(36,2))).alias("greaterThan0"),
F.sum((F.col(stat_col) == 0).cast(DoubleType())).alias("equalTo0"),
F.sum((F.col(stat_col) < 0).cast(DecimalType(36,2))).alias("lesserThan0"),
).toPandas()
这个问题缺少的是按年份范围分组,这是我还没有解决的问题。
另一个想法是使用windows,但我失败了:
windowSpecAgg = Window.partitionBy("yearSelected").orderBy("yearSelected")
df5 = df4.withColumn("row",row_number().over(windowSpecAgg)) \
.withColumn("avg", avg(col(conf['by'])).over(windowSpecAgg)) \
.withColumn("sum", sum(col(conf['by'])).over(windowSpecAgg)) \
.withColumn("min", min(col(conf['by'])).over(windowSpecAgg)) \
.withColumn("max", max(col(conf['by'])).over(windowSpecAgg)) \
.where(col("row")==1).select("yearSelected","avg","sum","min","max")
我失败的原因有两个,第一个是我无法正确地完成这个想法并将其转化为代码,第二个是我尝试为此Dataframe(而不是类似的Dataframe)执行的每个操作(show()、descripe()、count())都会得到:
py4j.protocol.Py4JJavaError: An error occurred while calling o2323.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 8 in stage 47.0 failed 1 times, most recent failure: Lost task 8.0 in stage 47.0 (TID 692, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last)
...
TypeError: strptime() argument 1 must be str, not None
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:81)
如果dataframe有用的话,我就是这样构建它的(正如我之前所说的,我这样做是为了从数据框中定义的列中获取年份) conf[by]
:
funcDateTransf = udf(lambda x: datetime.strptime(x, '%Y%m%d'), DateType())
df1 = df.withColumn('dateFormat', date_format(funcDateTransf(col(conf['by'])), 'MM-dd-yyy'))
df1 = df1.withColumn('date_in_dateFormat',
to_date(unix_timestamp(col('dateFormat'), 'MM-dd-yyyy').cast("timestamp")))
df3 = df1.select(conf['columnaJoin'],year('date_in_dateFormat').alias('yearSelected'))
# df3.show(5)
df4 = df1.join(df3, df1[conf['columnaJoin']] == df3[conf['columnaJoin']], 'inner')
我不明白为什么这种创建Dataframe的特殊方式会给我带来麻烦,而不是当我不创建这个额外的列时,但这正是我现在观察到的。
1条答案
按热度按时间8wtpewkr1#
从我对你的问题的理解来看,你正在为如何将每个年份划分为5年的范围而挣扎。假设您的数据集被称为
dataDF
. 现在,为了将您的个人年份按5年分组,您可以使用:结果数据集将是:
从这里,您可以按
new_column
并获得你提到的统计数据。这样做的缺点是您必须手动配置要选择的范围。然而,从你的例子来看,我猜你并没有走得太远,所以这应该是可行的:)