为什么聚类在spark cogroup函数中似乎不起作用

g2ieeal7  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(404)

我有两个hive集群表t1和t2

CREATE EXTERNAL TABLE `t1`(
   `t1_req_id` string,
    ...
PARTITIONED BY (`t1_stats_date` string)
CLUSTERED BY (t1_req_id) INTO 1000 BUCKETS

// t2 looks similar with same amount of buckets

代码如下所示:

val t1 = spark.table("t1").as[T1].rdd.map(v => (v.t1_req_id, v))
 val t2=  spark.table("t2").as[T2].rdd.map(v => (v.t2_req_id, v))

 val outRdd = t1.cogroup(t2)
      .flatMap { coGroupRes =>
        val key = coGroupRes._1
        val value: (Iterable[T1], Iterable[T2])= coGroupRes._2
        val t3List = // create a list with some logic on Iterable[T1] and Iterable[T2]
        t3List
 }
 outRdd.write....

我确保t1和t2表都有相同数量的分区,并且在spark submit上有 spark.sql.sources.bucketing.enabled=true 以及 spark.sessionState.conf.bucketingEnabled=true 旗帜
但是spark-dag没有显示聚类的任何影响。似乎仍然有数据完全洗牌我错过了什么,任何其他配置,调音?如何确保没有完全的数据洗牌?我的spark版本是2.3.1

jutyujz0

jutyujz01#

它不应该出现。
任何逻辑优化仅限于 DataFrame 应用程序编程接口。一旦将数据推送到黑盒函数数据集api(请参阅spark 2.0 dataset vs dataframe)和更高版本的RDDAPI,就不会再将更多信息推回到优化器。
您可以通过执行join first来部分地利用bucketing,在这些行周围获得一些东西

spark.table("t1")
   .join(spark.table("t2"), $"t1.t1_req_id" === $"t2.t2_req_id", "outer")
   .groupBy($"t1.v.t1_req_id", $"t2.t2_req_id")
   .agg(...) // For example collect_set($"t1.v"), collect_set($"t2.v")

但是,与cogroup不同,这将在组中生成完全笛卡尔积,并且可能不适用于您的情况

相关问题