胶水Spark:某些任务有0条记录可用于随机播放,但某些任务有磁盘溢出

6qfn3psc  于 7个月前  发布在  Apache
关注(0)|答案(1)|浏览(106)

我有一个spark job,其中一些任务有零记录输出和shuffle读取大小,其中一些任务有内存和磁盘溢出。有人可以帮助我做些什么来优化执行。执行信息:repartition_cnt=3500 [数据集在S3中,执行是通过Glue G2 X与298 DPU)
代码:

fct_ate_df.repartition(expr(s"pmod(hash(mae_id, rowsin, dep), $repartition_cnt)"))
       .write
       .mode("overwrite")
       .format("parquet")
       .bucketBy(repartition_cnt, "rowsin", "rowsin","dep")
       .sortBy("rowsin","dep")
       .option("path", s"s3://b222-id/data22te=$dat22et_date")
       .saveAsTable(s"btemp.intte_${table_name}_${regd}")

摘要x1c 0d1x
无记录输出/混洗

溢漏记录

dfuffjeb

dfuffjeb1#

你正在使用reparition by expression,我认为这就是为什么你会看到那些空分区的原因。在这种情况下,spark内部将使用HashPartitioner,而这个partinioner并不保证分区是相等的。
由于哈希算法,您可以确定具有相同表达式值的记录将位于同一个分区中,但您可能会得到空分区或其中包含例如5个键的分区。
在这种情况下,numPartitions不会改变任何东西,如果一个bucket中有许多键(所以后来的分区),最终生成的分区少于numPartition Spark将生成空分区,如您在示例中所见
我认为,如果你想有相等的分区,你可以删除这个表达式,你正在计算哈希,只留下$repartition_cnt
由于Spark将使用RoundRobinPartitioner,因此它将生成equals分区
如果你想了解dipper,你可以看看源代码,我认为这里有很好的起点
在这里你可以找到连接到不带表达式的重新分区的逻辑:Spark源代码
在这里你可以找到用于按表达式分区的逻辑:Spark源代码
问候!

相关问题