我有一个movielens csv数据集文件,其列为“movieid”、“userid”、“rating”、“timestamp”。我将每部电影的评分按计数和平均值进行汇总。下面是我的代码。
schema = StructType([
StructField('UserID', StringType(), True),
StructField('MovieID', StringType(), True),
StructField('Rating', FloatType(), True),
StructField('Timestamp', StringType(), True)
])
movie_df = spark.read.csv('../resources/ml-latest-small/ratings.csv', schema=schema, header='true') \
.select('MovieID', 'Rating')
movie_df.createOrReplaceTempView('movie_tbl')
popular_df = spark.sql("""
SELECT MovieID, count(*) AS rating_count, avg(Rating) AS avg_rating
FROM movie_tbl
GROUP BY MovieID
ORDER BY count(*) DESC """)
popular_df.write.csv(path='output/avg_ratings', mode='append', header='True')
默认情况下,spark shuffle分区的数量是200,但是我常用的分区只有46个,而不是200个。当我运行explain时,我可以看到explain的rangepartitioning和hashpartitioning步骤中的分区是200。
>>> movie_df.rdd.getNumPartitions()
1
>>>
>>> popular_df.rdd.getNumPartitions()
46
>>> spark.conf.get('spark.sql.shuffle.partitions')
'200'
>>>
>>> popular_df.explain()
== Physical Plan ==
* (3) Sort [rating_count#10L DESC NULLS LAST], true, 0
+- Exchange rangepartitioning(rating_count#10L DESC NULLS LAST, 200), true, [id=#32]
+- *(2) HashAggregate(keys=[MovieID#1], functions=[count(1), avg(cast(Rating#2 as double))])
+- Exchange hashpartitioning(MovieID#1, 200), true, [id=#28]
+- *(1) HashAggregate(keys=[MovieID#1], functions=[partial_count(1), partial_avg(cast(Rating#2 as double))])
+- FileScan csv [MovieID#1,Rating#2] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/Users/sgudisa/Desktop/python data analysis workbook/spark-workbook/resour..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<MovieID:string,Rating:float>
那么,系统是如何达到46个分区的呢?
另外,当我使用writedataframereader保存df时,我可以看到45个csv数据文件(从part-00000-到part-00044-)和一个成功文件。如果我把我的洗牌分区改为4,那就是3个csv文件和一个成功文件。那么,在生成dag时,系统是合并一个分区还是减少一个分区?
暂无答案!
目前还没有任何答案,快来回答吧!