org.apache.flink.api.java.operators.GroupReduceOperator.groupBy()方法的使用及代码示例

x33g5p2x  于2022-01-20 转载在 其他  
字(9.4k)|赞(0)|评价(0)|浏览(114)

本文整理了Java中org.apache.flink.api.java.operators.GroupReduceOperator.groupBy()方法的一些代码示例,展示了GroupReduceOperator.groupBy()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。GroupReduceOperator.groupBy()方法的具体详情如下:
包路径:org.apache.flink.api.java.operators.GroupReduceOperator
类名称:GroupReduceOperator
方法名:groupBy

GroupReduceOperator.groupBy介绍

暂无

代码示例

代码示例来源:origin: apache/flink

@Override
public DataSet<Tuple3<K, K, K>> run(Graph<K, VV, EV> input) throws Exception {
  DataSet<Edge<K, EV>> edges = input.getEdges();
  // annotate edges with degrees
  DataSet<EdgeWithDegrees<K>> edgesWithDegrees = edges.flatMap(new EdgeDuplicator<>())
      .groupBy(0).sortGroup(1, Order.ASCENDING).reduceGroup(new DegreeCounter<>())
      .groupBy(EdgeWithDegrees.V1, EdgeWithDegrees.V2).reduce(new DegreeJoiner<>());
  // project edges by degrees
  DataSet<Edge<K, NullValue>> edgesByDegree = edgesWithDegrees.map(new EdgeByDegreeProjector<>());
  // project edges by vertex id
  DataSet<Edge<K, NullValue>> edgesById = edgesByDegree.map(new EdgeByIdProjector<>());
  DataSet<Tuple3<K, K, K>> triangles = edgesByDegree
      // build triads
      .groupBy(EdgeWithDegrees.V1).sortGroup(EdgeWithDegrees.V2, Order.ASCENDING)
      .reduceGroup(new TriadBuilder<>())
      // filter triads
      .join(edgesById, JoinHint.REPARTITION_HASH_SECOND).where(Triad.V2, Triad.V3).equalTo(0, 1).with(new TriadFilter<>());
  return triangles;
}

代码示例来源:origin: apache/flink

.groupBy(0).reduceGroup(
    new GroupReduceFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple2<Integer, Long>>() {
      @Override

代码示例来源:origin: apache/flink

@Override
public EdgeMetrics<K, VV, EV> run(Graph<K, VV, EV> input)
    throws Exception {
  super.run(input);
  // s, t, (d(s), d(t))
  DataSet<Edge<K, Tuple3<EV, Degrees, Degrees>>> edgeDegreesPair = input
    .run(new EdgeDegreesPair<K, VV, EV>()
      .setParallelism(parallelism));
  // s, d(s), count of (u, v) where deg(u) < deg(v) or (deg(u) == deg(v) and u < v)
  DataSet<Tuple3<K, Degrees, LongValue>> edgeStats = edgeDegreesPair
    .flatMap(new EdgeStats<>())
      .setParallelism(parallelism)
      .name("Edge stats")
    .groupBy(0, 1)
    .reduceGroup(new ReduceEdgeStats<>())
      .setParallelism(parallelism)
      .name("Reduce edge stats")
    .groupBy(0)
    .reduce(new SumEdgeStats<>())
    .setCombineHint(CombineHint.HASH)
      .setParallelism(parallelism)
      .name("Sum edge stats");
  edgeMetricsHelper = new EdgeMetricsHelper<>();
  edgeStats
    .output(edgeMetricsHelper)
      .setParallelism(parallelism)
      .name("Edge metrics");
  return this;
}

代码示例来源:origin: apache/flink

.reduceGroup(new IdentityGroupReducerCombinable<Tuple3<Long,Long,Long>>()).withForwardedFields("0", "1", "2")
.groupBy(0)
.reduceGroup(new IdentityGroupReducerCombinable<Tuple3<Long,Long,Long>>())

代码示例来源:origin: apache/flink

.reduceGroup(new IdentityGroupReducerCombinable<Tuple3<Long,Long,Long>>()).withForwardedFields("0", "1", "2")
.groupBy(1)
.reduceGroup(new IdentityGroupReducerCombinable<Tuple3<Long,Long,Long>>())

代码示例来源:origin: org.apache.flink/flink-gelly_2.10

@Override
public DataSet<Tuple3<K, K, K>> run(Graph<K, VV, EV> input) throws Exception {
  DataSet<Edge<K, EV>> edges = input.getEdges();
  // annotate edges with degrees
  DataSet<EdgeWithDegrees<K>> edgesWithDegrees = edges.flatMap(new EdgeDuplicator<K, EV>())
      .groupBy(0).sortGroup(1, Order.ASCENDING).reduceGroup(new DegreeCounter<K, EV>())
      .groupBy(EdgeWithDegrees.V1, EdgeWithDegrees.V2).reduce(new DegreeJoiner<K>());
  // project edges by degrees
  DataSet<Edge<K, NullValue>> edgesByDegree = edgesWithDegrees.map(new EdgeByDegreeProjector<K>());
  // project edges by vertex id
  DataSet<Edge<K, NullValue>> edgesById = edgesByDegree.map(new EdgeByIdProjector<K>());
  DataSet<Tuple3<K, K, K>> triangles = edgesByDegree
      // build triads
      .groupBy(EdgeWithDegrees.V1).sortGroup(EdgeWithDegrees.V2, Order.ASCENDING)
      .reduceGroup(new TriadBuilder<K>())
      // filter triads
      .join(edgesById, JoinHint.REPARTITION_HASH_SECOND).where(Triad.V2, Triad.V3).equalTo(0, 1).with(new TriadFilter<K>());
  return triangles;
}

代码示例来源:origin: org.apache.flink/flink-gelly_2.11

@Override
public DataSet<Tuple3<K, K, K>> run(Graph<K, VV, EV> input) throws Exception {
  DataSet<Edge<K, EV>> edges = input.getEdges();
  // annotate edges with degrees
  DataSet<EdgeWithDegrees<K>> edgesWithDegrees = edges.flatMap(new EdgeDuplicator<>())
      .groupBy(0).sortGroup(1, Order.ASCENDING).reduceGroup(new DegreeCounter<>())
      .groupBy(EdgeWithDegrees.V1, EdgeWithDegrees.V2).reduce(new DegreeJoiner<>());
  // project edges by degrees
  DataSet<Edge<K, NullValue>> edgesByDegree = edgesWithDegrees.map(new EdgeByDegreeProjector<>());
  // project edges by vertex id
  DataSet<Edge<K, NullValue>> edgesById = edgesByDegree.map(new EdgeByIdProjector<>());
  DataSet<Tuple3<K, K, K>> triangles = edgesByDegree
      // build triads
      .groupBy(EdgeWithDegrees.V1).sortGroup(EdgeWithDegrees.V2, Order.ASCENDING)
      .reduceGroup(new TriadBuilder<>())
      // filter triads
      .join(edgesById, JoinHint.REPARTITION_HASH_SECOND).where(Triad.V2, Triad.V3).equalTo(0, 1).with(new TriadFilter<>());
  return triangles;
}

代码示例来源:origin: com.alibaba.blink/flink-gelly

@Override
public DataSet<Tuple3<K, K, K>> run(Graph<K, VV, EV> input) throws Exception {
  DataSet<Edge<K, EV>> edges = input.getEdges();
  // annotate edges with degrees
  DataSet<EdgeWithDegrees<K>> edgesWithDegrees = edges.flatMap(new EdgeDuplicator<>())
      .groupBy(0).sortGroup(1, Order.ASCENDING).reduceGroup(new DegreeCounter<>())
      .groupBy(EdgeWithDegrees.V1, EdgeWithDegrees.V2).reduce(new DegreeJoiner<>());
  // project edges by degrees
  DataSet<Edge<K, NullValue>> edgesByDegree = edgesWithDegrees.map(new EdgeByDegreeProjector<>());
  // project edges by vertex id
  DataSet<Edge<K, NullValue>> edgesById = edgesByDegree.map(new EdgeByIdProjector<>());
  DataSet<Tuple3<K, K, K>> triangles = edgesByDegree
      // build triads
      .groupBy(EdgeWithDegrees.V1).sortGroup(EdgeWithDegrees.V2, Order.ASCENDING)
      .reduceGroup(new TriadBuilder<>())
      // filter triads
      .join(edgesById, JoinHint.REPARTITION_HASH_SECOND).where(Triad.V2, Triad.V3).equalTo(0, 1).with(new TriadFilter<>());
  return triangles;
}

代码示例来源:origin: com.alibaba.blink/flink-gelly

@Override
public EdgeMetrics<K, VV, EV> run(Graph<K, VV, EV> input)
    throws Exception {
  super.run(input);
  // s, t, (d(s), d(t))
  DataSet<Edge<K, Tuple3<EV, Degrees, Degrees>>> edgeDegreesPair = input
    .run(new EdgeDegreesPair<K, VV, EV>()
      .setParallelism(parallelism));
  // s, d(s), count of (u, v) where deg(u) < deg(v) or (deg(u) == deg(v) and u < v)
  DataSet<Tuple3<K, Degrees, LongValue>> edgeStats = edgeDegreesPair
    .flatMap(new EdgeStats<>())
      .setParallelism(parallelism)
      .name("Edge stats")
    .groupBy(0, 1)
    .reduceGroup(new ReduceEdgeStats<>())
      .setParallelism(parallelism)
      .name("Reduce edge stats")
    .groupBy(0)
    .reduce(new SumEdgeStats<>())
    .setCombineHint(CombineHint.HASH)
      .setParallelism(parallelism)
      .name("Sum edge stats");
  edgeMetricsHelper = new EdgeMetricsHelper<>();
  edgeStats
    .output(edgeMetricsHelper)
      .setParallelism(parallelism)
      .name("Edge metrics");
  return this;
}

代码示例来源:origin: org.apache.flink/flink-gelly_2.11

@Override
public EdgeMetrics<K, VV, EV> run(Graph<K, VV, EV> input)
    throws Exception {
  super.run(input);
  // s, t, (d(s), d(t))
  DataSet<Edge<K, Tuple3<EV, Degrees, Degrees>>> edgeDegreesPair = input
    .run(new EdgeDegreesPair<K, VV, EV>()
      .setParallelism(parallelism));
  // s, d(s), count of (u, v) where deg(u) < deg(v) or (deg(u) == deg(v) and u < v)
  DataSet<Tuple3<K, Degrees, LongValue>> edgeStats = edgeDegreesPair
    .flatMap(new EdgeStats<>())
      .setParallelism(parallelism)
      .name("Edge stats")
    .groupBy(0, 1)
    .reduceGroup(new ReduceEdgeStats<>())
      .setParallelism(parallelism)
      .name("Reduce edge stats")
    .groupBy(0)
    .reduce(new SumEdgeStats<>())
    .setCombineHint(CombineHint.HASH)
      .setParallelism(parallelism)
      .name("Sum edge stats");
  edgeMetricsHelper = new EdgeMetricsHelper<>();
  edgeStats
    .output(edgeMetricsHelper)
      .setParallelism(parallelism)
      .name("Edge metrics");
  return this;
}

代码示例来源:origin: org.apache.flink/flink-gelly_2.10

@Override
public EdgeMetrics<K, VV, EV> run(Graph<K, VV, EV> input)
    throws Exception {
  super.run(input);
  // s, t, (d(s), d(t))
  DataSet<Edge<K, Tuple3<EV, Degrees, Degrees>>> edgeDegreesPair = input
    .run(new EdgeDegreesPair<K, VV, EV>()
      .setParallelism(parallelism));
  // s, d(s), count of (u, v) where deg(u) < deg(v) or (deg(u) == deg(v) and u < v)
  DataSet<Tuple3<K, Degrees, LongValue>> edgeStats = edgeDegreesPair
    .flatMap(new EdgeStats<K, EV>())
      .setParallelism(parallelism)
      .name("Edge stats")
    .groupBy(0, 1)
    .reduceGroup(new ReduceEdgeStats<K>())
      .setParallelism(parallelism)
      .name("Reduce edge stats")
    .groupBy(0)
    .reduce(new SumEdgeStats<K>())
    .setCombineHint(CombineHint.HASH)
      .setParallelism(parallelism)
      .name("Sum edge stats");
  edgeMetricsHelper = new EdgeMetricsHelper<>();
  edgeStats
    .output(edgeMetricsHelper)
      .setParallelism(parallelism)
      .name("Edge metrics");
  return this;
}

代码示例来源:origin: org.apache.flink/flink-java-examples

.groupBy(Edge.V1).sortGroup(Edge.V2, Order.ASCENDING).reduceGroup(new DegreeCounter())
.groupBy(EdgeWithDegrees.V1,EdgeWithDegrees.V2).reduce(new DegreeJoiner());

相关文章