我有两个hive集群表t1和t2
CREATE EXTERNAL TABLE `t1`(
`t1_req_id` string,
...
PARTITIONED BY (`t1_stats_date` string)
CLUSTERED BY (t1_req_id) INTO 1000 BUCKETS
// t2 looks similar with same amount of buckets
代码如下所示:
val t1 = spark.table("t1").as[T1].rdd.map(v => (v.t1_req_id, v))
val t2= spark.table("t2").as[T2].rdd.map(v => (v.t2_req_id, v))
val outRdd = t1.cogroup(t2)
.flatMap { coGroupRes =>
val key = coGroupRes._1
val value: (Iterable[T1], Iterable[T2])= coGroupRes._2
val t3List = // create a list with some logic on Iterable[T1] and Iterable[T2]
t3List
}
outRdd.write....
我确保t1和t2表都有相同数量的分区,并且在spark submit上有 spark.sql.sources.bucketing.enabled=true
以及 spark.sessionState.conf.bucketingEnabled=true
旗帜
但是spark-dag没有显示聚类的任何影响。似乎仍然有数据完全洗牌我错过了什么,任何其他配置,调音?如何确保没有完全的数据洗牌?我的spark版本是2.3.1
1条答案
按热度按时间jutyujz01#
它不应该出现。
任何逻辑优化仅限于
DataFrame
应用程序编程接口。一旦将数据推送到黑盒函数数据集api(请参阅spark 2.0 dataset vs dataframe)和更高版本的RDDAPI,就不会再将更多信息推回到优化器。您可以通过执行join first来部分地利用bucketing,在这些行周围获得一些东西
但是,与cogroup不同,这将在组中生成完全笛卡尔积,并且可能不适用于您的情况