org.apache.flink.api.java.operators.GroupReduceOperator类的使用及代码示例

x33g5p2x  于2022-01-20 转载在 其他  
字(14.1k)|赞(0)|评价(0)|浏览(127)

本文整理了Java中org.apache.flink.api.java.operators.GroupReduceOperator类的一些代码示例,展示了GroupReduceOperator类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。GroupReduceOperator类的具体详情如下:
包路径:org.apache.flink.api.java.operators.GroupReduceOperator
类名称:GroupReduceOperator

GroupReduceOperator介绍

[英]This operator represents the application of a "reduceGroup" function on a data set, and the result data set produced by the function.
[中]此运算符表示“reduceGroup”函数在数据集上的应用,以及该函数生成的结果数据集。

代码示例

代码示例来源:origin: apache/flink

@Override
public DataSet<Tuple3<K, K, K>> run(Graph<K, VV, EV> input) throws Exception {
  DataSet<Edge<K, EV>> edges = input.getEdges();
  // annotate edges with degrees
  DataSet<EdgeWithDegrees<K>> edgesWithDegrees = edges.flatMap(new EdgeDuplicator<>())
      .groupBy(0).sortGroup(1, Order.ASCENDING).reduceGroup(new DegreeCounter<>())
      .groupBy(EdgeWithDegrees.V1, EdgeWithDegrees.V2).reduce(new DegreeJoiner<>());
  // project edges by degrees
  DataSet<Edge<K, NullValue>> edgesByDegree = edgesWithDegrees.map(new EdgeByDegreeProjector<>());
  // project edges by vertex id
  DataSet<Edge<K, NullValue>> edgesById = edgesByDegree.map(new EdgeByIdProjector<>());
  DataSet<Tuple3<K, K, K>> triangles = edgesByDegree
      // build triads
      .groupBy(EdgeWithDegrees.V1).sortGroup(EdgeWithDegrees.V2, Order.ASCENDING)
      .reduceGroup(new TriadBuilder<>())
      // filter triads
      .join(edgesById, JoinHint.REPARTITION_HASH_SECOND).where(Triad.V2, Triad.V3).equalTo(0, 1).with(new TriadFilter<>());
  return triangles;
}

代码示例来源:origin: apache/flink

.withForwardedFields("*").setParallelism(p).name("Map1")
.groupBy("*").reduceGroup(new IdentityGroupReducer<Long>())
.withForwardedFields("*").setParallelism(p).name("Reduce1")
.map(new IdentityMapper<Long>())
.withForwardedFields("*").setParallelism(p * 2).name("Map2")
.groupBy("*").reduceGroup(new IdentityGroupReducer<Long>())
.withForwardedFields("*").setParallelism(p * 2).name("Reduce2")
.output(new DiscardingOutputFormat<Long>()).setParallelism(p * 2).name("Sink");

代码示例来源:origin: apache/flink

.withForwardedFields("f1->f0")
      .groupBy(0).reduceGroup(new ApplyGroupReduceFunction<>(edgesFunction))
        .name("GroupReduce on in-edges").returns(typeInfo);
case OUT:
  return edges.map(new ProjectVertexIdMap<>(0)).name("Vertex ID")
      .withForwardedFields("f0")
      .groupBy(0).reduceGroup(new ApplyGroupReduceFunction<>(edgesFunction))
        .name("GroupReduce on out-edges").returns(typeInfo);
case ALL:
  return edges.flatMap(new EmitOneEdgePerNode<>()).name("Emit edge")
      .groupBy(0).reduceGroup(new ApplyGroupReduceFunction<>(edgesFunction))
        .name("GroupReduce on in- and out-edges").returns(typeInfo);
default:
  throw new IllegalArgumentException("Illegal edge direction");

代码示例来源:origin: apache/flink

private <IN, OUT> DataSet<OUT> applyGroupReduceOperation(DataSet<IN> op1, PythonOperationInfo info, TypeInformation<OUT> type) {
  return op1
    .reduceGroup(new IdentityGroupReduce<IN>()).setCombinable(false).name("PythonGroupReducePreStep").setParallelism(info.parallelism)
    .mapPartition(new PythonMapPartition<IN, OUT>(operatorConfig, info.envID, info.setID, type))
    .setParallelism(info.parallelism).name(info.name);
}

代码示例来源:origin: apache/flink

private <K extends Tuple> void createFirstOperation(PythonOperationInfo info) {
  if (sets.isDataSet(info.parentID)) {
    DataSet<byte[]> op = sets.getDataSet(info.parentID);
    sets.add(info.setID, op
      .first(info.count).setParallelism(info.parallelism).name("First"));
  } else if (sets.isUnsortedGrouping(info.parentID)) {
    UnsortedGrouping<Tuple2<K, byte[]>> op = sets.getUnsortedGrouping(info.parentID);
    sets.add(info.setID, op
      .first(info.count).setParallelism(info.parallelism).name("First")
      .map(new KeyDiscarder<K>()).setParallelism(info.parallelism).name("FirstPostStep"));
  } else if (sets.isSortedGrouping(info.parentID)) {
    SortedGrouping<Tuple2<K, byte[]>> op = sets.getSortedGrouping(info.parentID);
    sets.add(info.setID, op
      .first(info.count).setParallelism(info.parallelism).name("First")
      .map(new KeyDiscarder<K>()).setParallelism(info.parallelism).name("FirstPostStep"));
  }
}

代码示例来源:origin: apache/flink

@Override
public EdgeMetrics<K, VV, EV> run(Graph<K, VV, EV> input)
    throws Exception {
  super.run(input);
  // s, t, (d(s), d(t))
  DataSet<Edge<K, Tuple3<EV, Degrees, Degrees>>> edgeDegreesPair = input
    .run(new EdgeDegreesPair<K, VV, EV>()
      .setParallelism(parallelism));
  // s, d(s), count of (u, v) where deg(u) < deg(v) or (deg(u) == deg(v) and u < v)
  DataSet<Tuple3<K, Degrees, LongValue>> edgeStats = edgeDegreesPair
    .flatMap(new EdgeStats<>())
      .setParallelism(parallelism)
      .name("Edge stats")
    .groupBy(0, 1)
    .reduceGroup(new ReduceEdgeStats<>())
      .setParallelism(parallelism)
      .name("Reduce edge stats")
    .groupBy(0)
    .reduce(new SumEdgeStats<>())
    .setCombineHint(CombineHint.HASH)
      .setParallelism(parallelism)
      .name("Sum edge stats");
  edgeMetricsHelper = new EdgeMetricsHelper<>();
  edgeStats
    .output(edgeMetricsHelper)
      .setParallelism(parallelism)
      .name("Edge metrics");
  return this;
}

代码示例来源:origin: apache/flink

.sortGroup(1, Order.ASCENDING)
  .reduceGroup(new GenerateGroupSpans<>())
    .setParallelism(parallelism)
    .name("Generate group spans");
  .sortGroup(2, Order.ASCENDING)
  .reduceGroup(new GenerateGroupPairs<>())
    .name("Generate group pairs");
  .groupBy(0, 1)
  .reduceGroup(new ComputeScores<>(minimumScore, minimumRatio))
    .name("Compute scores");
    .withBroadcastSet(sumOfScoresAndNumberOfNeighborPairs, SUM_OF_SCORES_AND_NUMBER_OF_NEIGHBOR_PAIRS);
    .flatMap(new MirrorResult<>())
      .name("Mirror results");
} else {

代码示例来源:origin: apache/flink

GroupReduceOperator<Long, Long> reduced = data.reduceGroup(new CombineReducer2()).name("reducer");
reduced.setCombinable(true);
reduced.output(new DiscardingOutputFormat<Long>()).name("sink");

代码示例来源:origin: apache/flink

@Override
public DataSet<Vertex<K, Degrees>> runInternal(Graph<K, VV, EV> input)
    throws Exception {
  // s, t, bitmask
  DataSet<Tuple2<K, ByteValue>> vertexWithEdgeOrder = input.getEdges()
    .flatMap(new EmitAndFlipEdge<>())
      .setParallelism(parallelism)
      .name("Emit and flip edge")
    .groupBy(0, 1)
    .reduceGroup(new ReduceBitmask<>())
      .setParallelism(parallelism)
      .name("Reduce bitmask");
  // s, d(s)
  DataSet<Vertex<K, Degrees>> vertexDegrees = vertexWithEdgeOrder
    .groupBy(0)
    .reduceGroup(new DegreeCount<>())
      .setParallelism(parallelism)
      .name("Degree count");
  if (includeZeroDegreeVertices.get()) {
    vertexDegrees = input.getVertices()
      .leftOuterJoin(vertexDegrees)
      .where(0)
      .equalTo(0)
      .with(new JoinVertexWithVertexDegrees<>())
        .setParallelism(parallelism)
        .name("Zero degree vertices");
  }
  return vertexDegrees;
}

代码示例来源:origin: apache/flink

.join(edgesById).where(Triad.V2, Triad.V3).equalTo(Edge.V1, Edge.V2).with(new TriadFilter());

代码示例来源:origin: dataArtisans/cascading-flink

private DataSet<Tuple> translateGlobalGroupBy(DataSet<Tuple> input, FlowNode node, int dop,
                        String[] sortKeys, Order sortOrder, Fields outFields) {
  DataSet<Tuple> result = input;
  // sort on sorting keys if necessary
  if(sortKeys != null && sortKeys.length > 0) {
    result = result
        .sortPartition(sortKeys[0], sortOrder)
        .setParallelism(1)
        .name("reduce-"+ node.getID());
    for(int i=1; i<sortKeys.length; i++) {
      result = result
          .sortPartition(sortKeys[i], sortOrder)
          .setParallelism(1);
    }
  }
  // group all data
  return result
      .reduceGroup(new GroupByReducer(node))
      .returns(new TupleTypeInfo(outFields))
      .withParameters(this.getFlinkNodeConfig(node))
      .setParallelism(dop)
      .name("reduce-"+ node.getID());
}

代码示例来源:origin: apache/flink

.reduceGroup(new IdentityGroupReducerCombinable<Tuple3<Long,Long,Long>>()).withForwardedFields("0", "1", "2")
.groupBy(0)
.reduceGroup(new IdentityGroupReducerCombinable<Tuple3<Long,Long,Long>>())
.output(new DiscardingOutputFormat<Tuple3<Long, Long, Long>>());

代码示例来源:origin: apache/flink

.withForwardedFields("*").name(NEXT_WORKSET_REDUCER_NAME);

代码示例来源:origin: org.apache.flink/flink-gelly_2.10

.sortGroup(1, Order.ASCENDING)
.reduceGroup(new GenerateGroupSpans<K>())
  .setParallelism(littleParallelism)
  .name("Generate group spans");
.sortGroup(2, Order.ASCENDING)
.reduceGroup(new GenerateGroupPairs<K>())
  .name("Generate group pairs");
.groupBy(0, 1)
.reduceGroup(new ComputeScores<K>(minimumScore, minimumRatio))
  .name("Compute scores");
  .withBroadcastSet(sumOfScoresAndNumberOfNeighborPairs, SUM_OF_SCORES_AND_NUMBER_OF_NEIGHBOR_PAIRS);

代码示例来源:origin: apache/flink

/**
 * Applies a GroupReduce transformation on a non-grouped {@link DataSet}.
 *
 * <p>The transformation calls a {@link org.apache.flink.api.common.functions.RichGroupReduceFunction} once with the full DataSet.
 * The GroupReduceFunction can iterate over all elements of the DataSet and emit any
 *   number of output elements including none.
 *
 * @param reducer The GroupReduceFunction that is applied on the DataSet.
 * @return A GroupReduceOperator that represents the reduced DataSet.
 *
 * @see org.apache.flink.api.common.functions.RichGroupReduceFunction
 * @see org.apache.flink.api.java.operators.GroupReduceOperator
 * @see DataSet
 */
public <R> GroupReduceOperator<T, R> reduceGroup(GroupReduceFunction<T, R> reducer) {
  if (reducer == null) {
    throw new NullPointerException("GroupReduce function must not be null.");
  }
  String callLocation = Utils.getCallLocationName();
  TypeInformation<R> resultType = TypeExtractor.getGroupReduceReturnTypes(reducer, getType(), callLocation, true);
  return new GroupReduceOperator<>(this, resultType, clean(reducer), callLocation);
}

代码示例来源:origin: apache/flink

@Test
public void testWithKryoGenericSer() throws Exception {
  final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  env.getConfig().enableForceKryo();
  Path in = new Path(inFile.getAbsoluteFile().toURI());
  AvroInputFormat<User> users = new AvroInputFormat<>(in, User.class);
  DataSet<User> usersDS = env.createInput(users);
  DataSet<Tuple2<String, Integer>> res = usersDS
    .groupBy((KeySelector<User, String>) value -> String.valueOf(value.getName()))
    .reduceGroup((GroupReduceFunction<User, Tuple2<String, Integer>>) (values, out) -> {
      for (User u : values) {
        out.collect(new Tuple2<>(u.getName().toString(), 1));
      }
    })
    .returns(Types.TUPLE(Types.STRING, Types.INT));
  res.writeAsText(resultPath);
  env.execute("Avro Key selection");
  expected = "(Charlie,1)\n(Alyssa,1)\n";
}

代码示例来源:origin: apache/flink

@Override
protected void testProgram() throws Exception {
  ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  env.setParallelism(4);
  DataSet<String> initialInput = env.fromElements("1", "2", "3", "4", "5").name("input");
  IterativeDataSet<String> iteration = initialInput.iterate(5).name("Loop");
  DataSet<String> sumReduce = iteration.reduceGroup(new SumReducer()).name("Compute sum (GroupReduce");
  DataSet<String> terminationFilter = iteration.filter(new TerminationFilter()).name("Compute termination criterion (Map)");
  List<String> result = iteration.closeWith(sumReduce, terminationFilter).collect();
  containsResultAsText(result, EXPECTED);
}

代码示例来源:origin: apache/flink

@Test
public void testSemanticPropsWithKeySelector5() {
  final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
  GroupReduceOperator<Tuple5<Integer, Long, String, Long, Integer>, Tuple5<Integer, Long, String, Long, Integer>> reduceOp =
      tupleDs.groupBy(new DummyTestKeySelector())
          .reduceGroup(new DummyGroupReduceFunction3())
          .withForwardedFields("4->0;3;3->1;2");
  SemanticProperties semProps = reduceOp.getSemanticProperties();
  assertTrue(semProps.getForwardingTargetFields(0, 0).size() == 0);
  assertTrue(semProps.getForwardingTargetFields(0, 1).size() == 0);
  assertTrue(semProps.getForwardingTargetFields(0, 2).size() == 0);
  assertTrue(semProps.getForwardingTargetFields(0, 3).size() == 0);
  assertTrue(semProps.getForwardingTargetFields(0, 4).size() == 1);
  assertTrue(semProps.getForwardingTargetFields(0, 4).contains(2));
  assertTrue(semProps.getForwardingTargetFields(0, 5).size() == 2);
  assertTrue(semProps.getForwardingTargetFields(0, 5).contains(1));
  assertTrue(semProps.getForwardingTargetFields(0, 5).contains(3));
  assertTrue(semProps.getForwardingTargetFields(0, 6).size() == 1);
  assertTrue(semProps.getForwardingTargetFields(0, 6).contains(0));
  assertTrue(semProps.getForwardingSourceField(0, 0) == 6);
  assertTrue(semProps.getForwardingSourceField(0, 1) == 5);
  assertTrue(semProps.getForwardingSourceField(0, 2) == 4);
  assertTrue(semProps.getForwardingSourceField(0, 3) == 5);
  assertTrue(semProps.getForwardingSourceField(0, 4) < 0);
  assertTrue(semProps.getReadFields(0) == null);
}

代码示例来源:origin: apache/flink

@Override
protected void testProgram() throws Exception {
  ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  DataSet<Integer> data = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8);
  IterativeDataSet<Integer> iteration = data.iterate(10);
  DataSet<Integer> result = data.reduceGroup(new PickOneAllReduce()).withBroadcastSet(iteration, "bc");
  final List<Integer> resultList = new ArrayList<Integer>();
  iteration.closeWith(result).output(new LocalCollectionOutputFormat<Integer>(resultList));
  env.execute();
  Assert.assertEquals(8, resultList.get(0).intValue());
}

代码示例来源:origin: apache/flink

@Test
public void testFirstNOnUngroupedDS() throws Exception {
  /*
   * First-n on ungrouped data set
   */
  final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
  DataSet<Tuple1<Integer>> seven = ds.first(7).map(new OneMapper()).sum(0);
  List<Tuple1<Integer>> result = seven.collect();
  String expected = "(7)\n";
  compareResultAsText(result, expected);
}

相关文章