groupby中的实际分区数vs dataframe的shuffle分区数

6tr1vspr  于 2021-05-17  发布在  Spark
关注(0)|答案(0)|浏览(243)

我有一个movielens csv数据集文件,其列为“movieid”、“userid”、“rating”、“timestamp”。我将每部电影的评分按计数和平均值进行汇总。下面是我的代码。

schema = StructType([
                    StructField('UserID', StringType(), True),
                    StructField('MovieID', StringType(), True),
                    StructField('Rating', FloatType(), True),
                    StructField('Timestamp', StringType(), True)
                    ])

movie_df = spark.read.csv('../resources/ml-latest-small/ratings.csv', schema=schema, header='true') \
    .select('MovieID', 'Rating')

movie_df.createOrReplaceTempView('movie_tbl')

popular_df = spark.sql("""
                        SELECT MovieID, count(*) AS rating_count, avg(Rating) AS avg_rating
                        FROM movie_tbl
                        GROUP BY MovieID
                        ORDER BY count(*) DESC """)

popular_df.write.csv(path='output/avg_ratings', mode='append', header='True')

默认情况下,spark shuffle分区的数量是200,但是我常用的分区只有46个,而不是200个。当我运行explain时,我可以看到explain的rangepartitioning和hashpartitioning步骤中的分区是200。

>>> movie_df.rdd.getNumPartitions()
1
>>> 
>>> popular_df.rdd.getNumPartitions()
46                                                                              
>>> spark.conf.get('spark.sql.shuffle.partitions')
'200'
>>>
>>> popular_df.explain()
== Physical Plan ==

* (3) Sort [rating_count#10L DESC NULLS LAST], true, 0

+- Exchange rangepartitioning(rating_count#10L DESC NULLS LAST, 200), true, [id=#32]
   +- *(2) HashAggregate(keys=[MovieID#1], functions=[count(1), avg(cast(Rating#2 as double))])
      +- Exchange hashpartitioning(MovieID#1, 200), true, [id=#28]
         +- *(1) HashAggregate(keys=[MovieID#1], functions=[partial_count(1), partial_avg(cast(Rating#2 as double))])
            +- FileScan csv [MovieID#1,Rating#2] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/Users/sgudisa/Desktop/python data analysis workbook/spark-workbook/resour..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<MovieID:string,Rating:float>

那么,系统是如何达到46个分区的呢?
另外,当我使用writedataframereader保存df时,我可以看到45个csv数据文件(从part-00000-到part-00044-)和一个成功文件。如果我把我的洗牌分区改为4,那就是3个csv文件和一个成功文件。那么,在生成dag时,系统是合并一个分区还是减少一个分区?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题