sparksql查询中的重分区重用交换

l3zydbqr  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(370)

我试图理解spark(2.4)物理计划。我们通过sqlapi与spark交互。
我正在使用下面的sql。sql在步骤1中有一个聚合和一个 join 下一步的操作。我的意图是 repartition 聚合步骤之前的源表,以便我可以重用它 Exchange 避免 Shuffles ( Exchanges )在下面的步骤中(在sm join步骤之前),但是它没有按照我的预期工作,因为spark添加了 Exchanges 在smj之前。你能帮我理解我哪里做错了吗。

create or replace temporary view prsn_dtl as
    select
    policy_num,
    prsn_id,
    eff_dt,
    from db.person_details
    cluster by policy_num;

    create or replace temporary view plcy_dtl as
    select
    policy_num,
    role_desc,
    prsn_actv_flg
    from plcy_detail;

    create or replace temporary view my_keys as
    select
    policy_num,
    prsn_id,
    max(eff_dt) eff_dt
    from prsn_dtl
    group by 1, 2;

    select
    keys.policy_num,
    keys.prsn_id,
    keys.eff_dt,
    plcy.role_desc,
    plcy.prsn_actv_flg
    from my_keys keys
    inner join plcy_dtl plcy
    on keys.policy_num = plcy.policy_num;

在dag表示中,我找到了3个 Exchanges ( 2 在左分支和 1 (右分支中)-
步骤1)第一步 hashpartitioning(policy_num#92, 200) 由于手册 cluster byaggregate 第二步是 Aggregate 上的运算符 hashpartitioning(policy_num#163, prsn_id#164, 200) 步骤3)最后 hashpartitioning(policy_num#163) 排序合并联接之前
我的问题是:
为什么没有呢 Exchange (来自 cluster by )从上面的步骤1开始,get向下游传播,在sort merge join之前的步骤3中没有被重用。
我的期望是spark将重用 Exchange 从步骤1开始( cluster by )不会再加一个 Exchange (在smj之前)在步骤3中,因为左分支 repartitioned 在查询的早期。
谁能解释一下我哪里出了问题吗。感谢您的帮助。
注意:我使用的是spark 2.4
谢谢

ycl3bljg

ycl3bljg1#

通过不重命名查询中的列并将相同的列传播到下游查询,问题似乎得到了缓解。

相关问题