我试图理解spark(2.4)物理计划。我们通过sqlapi与spark交互。
我正在使用下面的sql。sql在步骤1中有一个聚合和一个 join
下一步的操作。我的意图是 repartition
聚合步骤之前的源表,以便我可以重用它 Exchange
避免 Shuffles
( Exchanges
)在下面的步骤中(在sm join步骤之前),但是它没有按照我的预期工作,因为spark添加了 Exchanges
在smj之前。你能帮我理解我哪里做错了吗。
create or replace temporary view prsn_dtl as
select
policy_num,
prsn_id,
eff_dt,
from db.person_details
cluster by policy_num;
create or replace temporary view plcy_dtl as
select
policy_num,
role_desc,
prsn_actv_flg
from plcy_detail;
create or replace temporary view my_keys as
select
policy_num,
prsn_id,
max(eff_dt) eff_dt
from prsn_dtl
group by 1, 2;
select
keys.policy_num,
keys.prsn_id,
keys.eff_dt,
plcy.role_desc,
plcy.prsn_actv_flg
from my_keys keys
inner join plcy_dtl plcy
on keys.policy_num = plcy.policy_num;
在dag表示中,我找到了3个 Exchanges
( 2
在左分支和 1
(右分支中)-
步骤1)第一步 hashpartitioning(policy_num#92, 200)
由于手册 cluster by
在 aggregate
第二步是 Aggregate
上的运算符 hashpartitioning(policy_num#163, prsn_id#164, 200)
步骤3)最后 hashpartitioning(policy_num#163)
排序合并联接之前
我的问题是:
为什么没有呢 Exchange
(来自 cluster by
)从上面的步骤1开始,get向下游传播,在sort merge join之前的步骤3中没有被重用。
我的期望是spark将重用 Exchange
从步骤1开始( cluster by
)不会再加一个 Exchange
(在smj之前)在步骤3中,因为左分支 repartitioned
在查询的早期。
谁能解释一下我哪里出了问题吗。感谢您的帮助。
注意:我使用的是spark 2.4
谢谢
1条答案
按热度按时间ycl3bljg1#
通过不重命名查询中的列并将相同的列传播到下游查询,问题似乎得到了缓解。