spark最终任务比前199个任务长100倍,如何改进

fiei3ece  于 2021-06-28  发布在  Hive
关注(0)|答案(1)|浏览(288)

在使用Dataframe运行查询时,我发现了一些性能问题。我在研究中发现,长时间运行的finally任务可能是数据未受到最佳干扰的一个标志,但尚未找到解决此问题的详细过程。
我开始加载两个表作为Dataframe,然后在一个字段上连接这些表。为了提高性能,我尝试添加distribute by(重分区)和sort by,但仍然看到这个长时间运行的最终任务。这里是我的代码的一个简单版本,请注意查询1和查询2实际上并不是这么简单,使用自定义项来计算一些值。
我尝试了一些不同的设置 spark.sql.shuffle . 我试了100次,但失败了(老实说,我并没有真正调试这个)。我试了300、4000和8000。业绩随着增长而下降。我选择一天的数据,其中每个文件是一个小时。

val df1 = sqlContext.sql("Select * from Table1")
val df2 = sqlContext.sql("Select * from Table2")

val distributeDf1 = df1
    .repartition(df1("userId"))
    .sortWithinPartitions(df1("userId"))

val distributeDf2 = df2
    .repartition(df2("userId"))
    .sortWithinPartitions(df2("userId"))

distributeDf1.registerTempTable("df1")
distributeDf2.registerTempTable("df2")

val df3 = sqlContext
  .sql("""
    Select 
      df1.* 
    from 
      df1 
    left outer join df2 on 
      df1.userId = df2.userId""")

因为按userid分区似乎不太理想,所以我可以改为按时间戳分区。如果我这样做,我应该只做日期+小时吗?如果我有少于200个唯一的组合,我会有空的执行者吗?

fcwjkofz

fcwjkofz1#

Spark>=3.0
因为3.0spark提供了处理歪斜连接的内置优化,可以使用 spark.sql.adaptive.optimizeSkewedJoin.enabled 财产。
详见spark-29544。
Spark<3.0
你显然有一个巨大的右数据倾斜的问题。让我们看看您提供的统计数据:

df1 = [mean=4.989209978967438, stddev=2255.654165352454, count=2400088] 
df2 = [mean=1.0, stddev=0.0, count=18408194]

平均值在5左右,标准差在2000以上,你会得到一条长尾。
由于重新分区后某些密钥比其他密钥更频繁,因此某些执行器将比其余的执行器有更多的工作要做。
此外,根据您的描述,问题可能出在散列到同一分区的单个或几个密钥上。
那么,让我们首先确定异常值(伪代码):

val mean = 4.989209978967438 
val sd = 2255.654165352454

val df1 = sqlContext.sql("Select * from Table1")
val counts = df.groupBy("userId").count.cache

val frequent = counts
  .where($"count" > mean + 2 * sd)  // Adjust threshold based on actual dist.
  .alias("frequent")
  .join(df1, Seq("userId"))

其余的:

val infrequent = counts
  .where($"count" <= mean + 2 * sd)
  .alias("infrequent")
  .join(df1, Seq("userId"))

这真的值得期待吗?如果没有,试着找出问题的源头。
如果需要,您可以尝试:
广播小表:

val df2 = sqlContext.sql("Select * from Table2")
df2.join(broadcast(df1), Seq("userId"), "rightouter")

分裂,统一( union )只经常广播:

df2.join(broadcast(frequent), Seq("userId"), "rightouter")
  .union(df2.join(infrequent, Seq("userId"), "rightouter"))

盐渍 userId 一些随机数据
但你不应该:
重新划分所有数据并在本地排序(尽管单独在本地排序不成问题)
对完整数据执行标准哈希联接。

相关问题