apachespark:使数据集上的多列groupby与tin分区一起工作

5rgfhyps  于 2021-07-09  发布在  Spark
关注(0)|答案(2)|浏览(228)

我确定每个城市活动部门的员工和企业数量:

|codeCommune|nomCommune          |regroupement|section|libelleAPE                                                                                                                       |nombreEntreprises|nombreSalaries|nombreActifs|
+-----------+--------------------+------------+-------+---------------------------------------------------------------------------------------------------------------------------------+-----------------+--------------+------------+
|14654      |Saint-Pierre-en-Auge|84.11Z      |O      |Administration publique générale                                                                                                 |3                |153.5         |169.5       |
|14654      |Saint-Pierre-en-Auge|16.24Z      |C      |Fabrication d'emballages en bois                                                                                                 |1                |149.5         |150.5       |
|14654      |Saint-Pierre-en-Auge|10.11Z      |C      |Transformation et conservation de la viande de boucherie                                                                         |1                |149.5         |150.5       |

具有分组级别( regroupement 以下)由用户设置:

+-----------+--------------------+------------+-------+------------------------------------------------------------------------------------------------------------------------------------------+-----------------+--------------+------------+
|codeCommune|nomCommune          |regroupement|section|libelleAPE                                                                                                                                |nombreEntreprises|nombreSalaries|nombreActifs|
+-----------+--------------------+------------+-------+------------------------------------------------------------------------------------------------------------------------------------------+-----------------+--------------+------------+
|14654      |Saint-Pierre-en-Auge|10          |C      |Industries alimentaires                                                                                                                   |16               |208.0         |225.0       |
|14654      |Saint-Pierre-en-Auge|86          |Q      |Activités pour la santé humaine                                                                                                           |46               |169.5         |218.5       |
|14654      |Saint-Pierre-en-Auge|84          |O      |Administration publique et défense ; sécurité sociale obligatoire                                                                         |5                |153.5         |171.5       |

工作是这样做的:
Dataset 对于按部门代码(大致为城市代码的前两个字符)划分的企业和机构,选择以下列: city_code , city_name , grouping (我们保留的活动代码部分: 84.11Z 或者 84 ), section (概括一项活动的部门的代码:工业、商业等), activity_description , siren (企业标识符:一个企业可能有许多机构), number_of_workers , number_of_actives_peoplegroupBy 完成时间:

RelationalGroupedDataset group = enterprisesAndEstablishments
   .groupBy("city_code", "city_name", "grouping", "section", "activity_description");

我通过聚合执行计算,然后:

group.agg(countDistinct("siren").as("nombreEntreprises"), 
   sum("number_of_workers").as("nombreSalaries"),
   sum("number_of_actives_people").as("nombreActifs"));

我的问题是 groupBy 方法不关心数据集分区,而是从数据集的任何分区收集数据 enterprisesAndEstablishments 对大量数据进行全局排序,
当只针对一个部分时效率会更高:此示例中的所有活动都在分区中 [codeDepartement=14] .
我希望它尊重这些分区并这样做 groupBy 以避免混乱。
我在找 sortWithPartitions 的同伴 groupBy . 一些被称为 groupWithinPartitions 但我没找到。
实现我所追求的目标的最佳方式是什么,
或者如果没有工具,我应该选择什么替代方法?

vs91vp4v

vs91vp4v1#

如果您绝对确定每个分区包含属于该组的所有行,那么可以使用mappartitions在纯java中计算每个组的值,而不使用spark。
这样,分组只在一个分区内进行,不会发生洗牌。缺点是不能使用spark的聚合函数。它们必须被纯java解决方案所取代,例如流。由于这需要相当多的代码行,我不确定是否值得付出努力。

Dataset<Row> aggregated = enterprisesAndEstablishments.mapPartitions((Iterator<Row> r) -> {

    //transform iterator to stream
    Iterable<Row> iterable = () -> r;
    Stream<Row> targetStream = StreamSupport.stream(iterable.spliterator(), false);

    //construct the collectors for the three output columns
    //assumption: the data type is Integer. It could be replaced by Long if necessary
    Collector<Row, ?, Integer> distinctSiren = Collectors.collectingAndThen(Collectors.mapping((Row s) -> s.<String>getAs("siren"), Collectors.toSet()), Set::size);
    Collector<Row, ?, Integer> sumOverWorkers = Collectors.summingInt((Row s) -> s.<Integer>getAs("number_of_workers"));
    Collector<Row, ?, Integer> sumOverActive = Collectors.summingInt((Row s) -> s.<Integer>getAs("number_of_actives_people"));

    //Combine the the three collectors
    Collector<Row, ?, List<Integer>> combinedCollectors =
            combine(Arrays.asList(distinctSiren, sumOverWorkers, sumOverActive));

    //execute the grouping
    Map<List<Object>, List<Integer>> collect = targetStream.collect(Collectors.groupingBy(s -> {
        return new ArrayList<>(Arrays.asList(s.<Long>getAs("city_code"),
                s.<String>getAs("city_name"),
                s.<String>getAs("grouping"),
                s.<String>getAs("section"),
                s.<String>getAs("activity_description")));
    }, combinedCollectors));

    //merge the lists produced by the grouping operation
    collect.forEach(List::addAll);

    //back to Spark rows
    List<Row> result = new ArrayList<>();
    collect.keySet().forEach(l -> result.add(RowFactory.create(l.toArray())));
    return result.iterator();
}, RowEncoder.apply(new StructType().add("city_code", DataTypes.IntegerType)
        .add("city_name", DataTypes.StringType)
        .add("grouping", DataTypes.StringType)
        .add("section", DataTypes.StringType)
        .add("activity_description", DataTypes.StringType)
        .add("nombreEntreprises", DataTypes.IntegerType)
        .add("nombreSalaries", DataTypes.IntegerType)
        .add("nombreActifs", DataTypes.IntegerType)));

如果分区是正确的,则 aggregated Dataframe将与中的相同 group.agg(...) 在这个问题上。
方法 combine 用于组合三个采集器的参数取自此问题和此链接:

public static <V, R> Collector<V, ?, List<R>> combine(List<Collector<V, ?, R>> collectors) {

    final Supplier<List<Object>> supplier = () -> collectors.stream().map(Collector::supplier)
            .map(Supplier::get).collect(Collectors.toList());

    final BiConsumer<List<Object>, V> biConsumer = (List<Object> list, V e) -> IntStream.range(0, collectors.size())//
            .forEach(i -> ((BiConsumer<Object, V>) collectors.get(i).accumulator()).accept(list.get(i), e));

    final BinaryOperator<List<Object>> binaryOperator = (List<Object> l1, List<Object> l2) -> {
        IntStream.range(0, collectors.size()).forEach(
                i -> l1.set(i, ((BinaryOperator<Object>) collectors.get(i).combiner()).apply(l1.get(i), l2.get(i))));
        return l1;
    };

    Function<List<Object>, List<R>> function = (List<Object> list) -> 
    IntStream.range(0, collectors.size()).mapToObj(
            i ->((Function<Object, R>) collectors.get(i).finisher()).apply(list.get(i))).collect(Collectors.toList());

    return Collector.of( supplier, biConsumer, binaryOperator, function);
}
mefy6pfw

mefy6pfw2#

您可以使用rdd低级函数实现同样的功能 aggregateByKey 它是聚合函数之一(其他函数是 reduceByKey & groupByKey )可在Spark一个区别,使它成为一个强大的三个之一。
聚合键不需要对同一数据类型进行操作,可以在分区内进行不同的聚合(最大、最小、平均、总和和计数),并在分区之间进行不同的聚合。

case class EnterpriseEmp(
    city_code: Long,
    city_name: String,
    grouping: Int,
    section: String,
    activity_description: String,
    siren: String,
    number_of_workers: Long,
    number_of_actives_people: Long
)

val empList =
      Array(
        EnterpriseEmp(14654, "Saint-Pierre-en-Auge", 10, "C", "Industries alimentaires", "A1", 100, 100),
        EnterpriseEmp(14654, "Saint-Pierre-en-Auge", 10, "C", "Industries alimentaires", "A1", 150, 200),
        EnterpriseEmp(14654, "Saint-Pierre-en-Auge", 10, "C", "Industries alimentaires", "A1", 200, 300),
        EnterpriseEmp(14654, "Saint-Pierre-en-Auge", 86, "Q", "Activités pour la santé humaine", "B1", 1000, 1001),
        EnterpriseEmp(14654, "Saint-Pierre-en-Auge", 86, "Q", "Activités pour la santé humaine", "B1", 1050, 2001),
        EnterpriseEmp(14654, "Saint-Pierre-en-Auge", 86, "Q", "Activités pour la santé humaine", "B1", 2000, 3001)
      )
val data = sc.parallelize(empList)
val keyed = data.keyBy(key =>
  (
    key.city_code,
    key.city_name,
    key.grouping,
    key.section,
    key.activity_description
  )
)

aggregatebykey需要3个主要输入:
zerovalue:初始值,它不会影响聚合值。
组合器函数:该函数接受两个参数。第二个参数合并到第一个参数中。此函数用于组合/合并单个分区中的值。
reduce/merge函数:这个函数还接受两个参数。在这里,参数被合并成一个跨rdd分区的参数。

val init_value = (0L, 0L, 0L) //sum("number_of_workers"), sum("number_of_actives_people"), count("siren")
val combinerFunc = (inter: (Long, Long, Long), value: EnterpriseEmp) => {
  (
    inter._1 + value.number_of_workers,
    inter._2 + value.number_of_actives_people,
    inter._3 + 1
  )
}
val reduceFunc = (p1: (Long, Long, Long), p2: (Long, Long, Long)) => {
  (p1._1 + p2._1, p1._2 + p2._2, p1._3 + p2._3)
}
val output = keyed.aggregateByKey(init_value)(combinerFunc, reduceFunc)

输出:

output.collect.foreach(println)
((14654,Saint-Pierre-en-Auge,86,Q,Activités pour la santé humaine),(4050,6003,3))
((14654,Saint-Pierre-en-Auge,10,C,Industries alimentaires),(450,600,3))

相关问题