在使用Dataframe运行查询时,我发现了一些性能问题。我在研究中发现,长时间运行的finally任务可能是数据未受到最佳干扰的一个标志,但尚未找到解决此问题的详细过程。
我开始加载两个表作为Dataframe,然后在一个字段上连接这些表。为了提高性能,我尝试添加distribute by(重分区)和sort by,但仍然看到这个长时间运行的最终任务。这里是我的代码的一个简单版本,请注意查询1和查询2实际上并不是这么简单,使用自定义项来计算一些值。
我尝试了一些不同的设置 spark.sql.shuffle
. 我试了100次,但失败了(老实说,我并没有真正调试这个)。我试了300、4000和8000。业绩随着增长而下降。我选择一天的数据,其中每个文件是一个小时。
val df1 = sqlContext.sql("Select * from Table1")
val df2 = sqlContext.sql("Select * from Table2")
val distributeDf1 = df1
.repartition(df1("userId"))
.sortWithinPartitions(df1("userId"))
val distributeDf2 = df2
.repartition(df2("userId"))
.sortWithinPartitions(df2("userId"))
distributeDf1.registerTempTable("df1")
distributeDf2.registerTempTable("df2")
val df3 = sqlContext
.sql("""
Select
df1.*
from
df1
left outer join df2 on
df1.userId = df2.userId""")
因为按userid分区似乎不太理想,所以我可以改为按时间戳分区。如果我这样做,我应该只做日期+小时吗?如果我有少于200个唯一的组合,我会有空的执行者吗?
1条答案
按热度按时间fcwjkofz1#
Spark>=3.0
因为3.0spark提供了处理歪斜连接的内置优化,可以使用
spark.sql.adaptive.optimizeSkewedJoin.enabled
财产。详见spark-29544。
Spark<3.0
你显然有一个巨大的右数据倾斜的问题。让我们看看您提供的统计数据:
平均值在5左右,标准差在2000以上,你会得到一条长尾。
由于重新分区后某些密钥比其他密钥更频繁,因此某些执行器将比其余的执行器有更多的工作要做。
此外,根据您的描述,问题可能出在散列到同一分区的单个或几个密钥上。
那么,让我们首先确定异常值(伪代码):
其余的:
这真的值得期待吗?如果没有,试着找出问题的源头。
如果需要,您可以尝试:
广播小表:
分裂,统一(
union
)只经常广播:盐渍
userId
一些随机数据但你不应该:
重新划分所有数据并在本地排序(尽管单独在本地排序不成问题)
对完整数据执行标准哈希联接。