如何通过从s3导入文件来动态计算sparkDataframe的每列中非空值的百分比?

5hcedyr0  于 2021-05-24  发布在  Spark
关注(0)|答案(1)|浏览(260)

我想使用日期从s3路径动态导入文件(对于每个日期,s3路径上都有一个文件),导入之后,我想计算sparkDataframe的每一列全年的非空值百分比。就我而言,现在是2019年。
假设2019年:

columns   non null percentage

Column1        80%
Column2        75%
Column3        57%

我试着自己编写代码,但我不确定它是否能提供适当的输出:

daterange = pd.date_range(start="2019-01-01",end="2019-12-31")

for date in daterange

 df_clm= spark.read.parquet('s3:path/dt={date}')

    for col in df_clm.column
        null_cnt= df_clm[col].isna().sum()
        total= df_clm.count()
        percent= ((total-null_cnt)/total)*100

我想上面的代码会给我每天(每个文件)的输出,即365天(365个文件),但我想积累所有的文件,并计算每列的百分比。
注意:所有文件都有相同的列。
有人能帮我编码吗?

ffscu2ro

ffscu2ro1#

您可以从dataframe摘要中获取每列的非空计数。for循环非常糟糕。阅读你的整个Parquet文件和过滤日期,如你所愿,然后做下面的步骤。

df.show()

+---+----+----+
| id|val1|val2|
+---+----+----+
|  1|  aa|  bb|
|  2|null|  cc|
|  3|  dd|null|
|  4|  ee|null|
|  5|  ff|null|
+---+----+----+

summary = df.describe().filter("summary = 'count'")
columns = df.columns
counts  = df.count()

summary.rdd \
  .flatMap(lambda row: map(lambda i: (columns[i-1], row[i]), range(1, len(row)))) \
  .toDF(['col', 'count']) \
  .withColumn('ratio', col('count') / counts * 100) \
  .show(10, False)

+----+-----+-----+
|col |count|ratio|
+----+-----+-----+
|id  |5    |100.0|
|val1|4    |80.0 |
|val2|2    |40.0 |
+----+-----+-----+

相关问题