我有一个spark job,其中一些任务有零记录输出和shuffle读取大小,其中一些任务有内存和磁盘溢出。有人可以帮助我做些什么来优化执行。执行信息:repartition_cnt=3500 [数据集在S3中,执行是通过Glue G2 X与298 DPU)
代码:
fct_ate_df.repartition(expr(s"pmod(hash(mae_id, rowsin, dep), $repartition_cnt)"))
.write
.mode("overwrite")
.format("parquet")
.bucketBy(repartition_cnt, "rowsin", "rowsin","dep")
.sortBy("rowsin","dep")
.option("path", s"s3://b222-id/data22te=$dat22et_date")
.saveAsTable(s"btemp.intte_${table_name}_${regd}")
摘要x1c 0d1x
无记录输出/混洗
溢漏记录
1条答案
按热度按时间dfuffjeb1#
你正在使用reparition by expression,我认为这就是为什么你会看到那些空分区的原因。在这种情况下,spark内部将使用HashPartitioner,而这个partinioner并不保证分区是相等的。
由于哈希算法,您可以确定具有相同表达式值的记录将位于同一个分区中,但您可能会得到空分区或其中包含例如5个键的分区。
在这种情况下,numPartitions不会改变任何东西,如果一个bucket中有许多键(所以后来的分区),最终生成的分区少于numPartition Spark将生成空分区,如您在示例中所见
我认为,如果你想有相等的分区,你可以删除这个表达式,你正在计算哈希,只留下$repartition_cnt
由于Spark将使用RoundRobinPartitioner,因此它将生成equals分区
如果你想了解dipper,你可以看看源代码,我认为这里有很好的起点
在这里你可以找到连接到不带表达式的重新分区的逻辑:Spark源代码
在这里你可以找到用于按表达式分区的逻辑:Spark源代码
问候!