scala将spark数据集中的两个集群表连接起来,结果似乎是完全无序的

mkh04yzy  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(330)

我有两个hive集群表t1和t2

CREATE EXTERNAL TABLE `t1`(
  `t1_req_id` string,
   ...
PARTITIONED BY (`t1_stats_date` string)
CLUSTERED BY (t1_req_id) INTO 1000 BUCKETS
// t2 looks similar with same amount of buckets

插入部分发生在配置单元中

set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;
insert overwrite table `t1` partition(t1_stats_date,t1_stats_hour)
   select *
   from t1_raw
   where t1_stats_date='2020-05-10' and t1_stats_hour='12' AND 
   t1_req_id is not null

代码如下所示:

val t1 = spark.table("t1").as[T1]
 val t2=  spark.table("t2").as[T2]
 val outDS = t1.joinWith(t2, t1("t1_req_id) === t2("t2_req_id), "fullouter")
  .map { case (t1Obj, t2Obj) =>
    val t3:T3 = // do some logic
    t3 
  }
 outDS.toDF.write....

我看到了dag中的投影-但是看起来这个作业仍然执行完整的数据洗牌,而查看executor的日志时,我没有看到它在一个块中读取两个表的同一个存储桶-这就是我希望找到的
spark.sql.sources.bucketing.enabled , spark.sessionState.conf.bucketingEnabled 以及 spark.sql.join.preferSortMergeJoin 旗帜
我错过了什么?如果有带扣子的table,为什么还有全套的洗牌呢?目前的spark版本是2.3.1

alen0pnh

alen0pnh1#

这里要检查的一种可能性是类型是否不匹配。e、 g.如果联接列的类型是t1中的string和t2中的bigint。即使两个类型都是整数(例如,一个是int,另一个是bigint),spark仍然会在这里添加shuffle,因为不同的类型使用不同的哈希函数进行bucketing。

相关问题