本文整理了Java中org.apache.flink.api.java.operators.GroupReduceOperator
类的一些代码示例,展示了GroupReduceOperator
类的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。GroupReduceOperator
类的具体详情如下:
包路径:org.apache.flink.api.java.operators.GroupReduceOperator
类名称:GroupReduceOperator
[英]This operator represents the application of a "reduceGroup" function on a data set, and the result data set produced by the function.
[中]此运算符表示“reduceGroup”函数在数据集上的应用,以及该函数生成的结果数据集。
代码示例来源:origin: apache/flink
@Override
public DataSet<Tuple3<K, K, K>> run(Graph<K, VV, EV> input) throws Exception {
DataSet<Edge<K, EV>> edges = input.getEdges();
// annotate edges with degrees
DataSet<EdgeWithDegrees<K>> edgesWithDegrees = edges.flatMap(new EdgeDuplicator<>())
.groupBy(0).sortGroup(1, Order.ASCENDING).reduceGroup(new DegreeCounter<>())
.groupBy(EdgeWithDegrees.V1, EdgeWithDegrees.V2).reduce(new DegreeJoiner<>());
// project edges by degrees
DataSet<Edge<K, NullValue>> edgesByDegree = edgesWithDegrees.map(new EdgeByDegreeProjector<>());
// project edges by vertex id
DataSet<Edge<K, NullValue>> edgesById = edgesByDegree.map(new EdgeByIdProjector<>());
DataSet<Tuple3<K, K, K>> triangles = edgesByDegree
// build triads
.groupBy(EdgeWithDegrees.V1).sortGroup(EdgeWithDegrees.V2, Order.ASCENDING)
.reduceGroup(new TriadBuilder<>())
// filter triads
.join(edgesById, JoinHint.REPARTITION_HASH_SECOND).where(Triad.V2, Triad.V3).equalTo(0, 1).with(new TriadFilter<>());
return triangles;
}
代码示例来源:origin: apache/flink
.withForwardedFields("*").setParallelism(p).name("Map1")
.groupBy("*").reduceGroup(new IdentityGroupReducer<Long>())
.withForwardedFields("*").setParallelism(p).name("Reduce1")
.map(new IdentityMapper<Long>())
.withForwardedFields("*").setParallelism(p * 2).name("Map2")
.groupBy("*").reduceGroup(new IdentityGroupReducer<Long>())
.withForwardedFields("*").setParallelism(p * 2).name("Reduce2")
.output(new DiscardingOutputFormat<Long>()).setParallelism(p * 2).name("Sink");
代码示例来源:origin: apache/flink
.withForwardedFields("f1->f0")
.groupBy(0).reduceGroup(new ApplyGroupReduceFunction<>(edgesFunction))
.name("GroupReduce on in-edges").returns(typeInfo);
case OUT:
return edges.map(new ProjectVertexIdMap<>(0)).name("Vertex ID")
.withForwardedFields("f0")
.groupBy(0).reduceGroup(new ApplyGroupReduceFunction<>(edgesFunction))
.name("GroupReduce on out-edges").returns(typeInfo);
case ALL:
return edges.flatMap(new EmitOneEdgePerNode<>()).name("Emit edge")
.groupBy(0).reduceGroup(new ApplyGroupReduceFunction<>(edgesFunction))
.name("GroupReduce on in- and out-edges").returns(typeInfo);
default:
throw new IllegalArgumentException("Illegal edge direction");
代码示例来源:origin: apache/flink
private <IN, OUT> DataSet<OUT> applyGroupReduceOperation(DataSet<IN> op1, PythonOperationInfo info, TypeInformation<OUT> type) {
return op1
.reduceGroup(new IdentityGroupReduce<IN>()).setCombinable(false).name("PythonGroupReducePreStep").setParallelism(info.parallelism)
.mapPartition(new PythonMapPartition<IN, OUT>(operatorConfig, info.envID, info.setID, type))
.setParallelism(info.parallelism).name(info.name);
}
代码示例来源:origin: apache/flink
private <K extends Tuple> void createFirstOperation(PythonOperationInfo info) {
if (sets.isDataSet(info.parentID)) {
DataSet<byte[]> op = sets.getDataSet(info.parentID);
sets.add(info.setID, op
.first(info.count).setParallelism(info.parallelism).name("First"));
} else if (sets.isUnsortedGrouping(info.parentID)) {
UnsortedGrouping<Tuple2<K, byte[]>> op = sets.getUnsortedGrouping(info.parentID);
sets.add(info.setID, op
.first(info.count).setParallelism(info.parallelism).name("First")
.map(new KeyDiscarder<K>()).setParallelism(info.parallelism).name("FirstPostStep"));
} else if (sets.isSortedGrouping(info.parentID)) {
SortedGrouping<Tuple2<K, byte[]>> op = sets.getSortedGrouping(info.parentID);
sets.add(info.setID, op
.first(info.count).setParallelism(info.parallelism).name("First")
.map(new KeyDiscarder<K>()).setParallelism(info.parallelism).name("FirstPostStep"));
}
}
代码示例来源:origin: apache/flink
@Override
public EdgeMetrics<K, VV, EV> run(Graph<K, VV, EV> input)
throws Exception {
super.run(input);
// s, t, (d(s), d(t))
DataSet<Edge<K, Tuple3<EV, Degrees, Degrees>>> edgeDegreesPair = input
.run(new EdgeDegreesPair<K, VV, EV>()
.setParallelism(parallelism));
// s, d(s), count of (u, v) where deg(u) < deg(v) or (deg(u) == deg(v) and u < v)
DataSet<Tuple3<K, Degrees, LongValue>> edgeStats = edgeDegreesPair
.flatMap(new EdgeStats<>())
.setParallelism(parallelism)
.name("Edge stats")
.groupBy(0, 1)
.reduceGroup(new ReduceEdgeStats<>())
.setParallelism(parallelism)
.name("Reduce edge stats")
.groupBy(0)
.reduce(new SumEdgeStats<>())
.setCombineHint(CombineHint.HASH)
.setParallelism(parallelism)
.name("Sum edge stats");
edgeMetricsHelper = new EdgeMetricsHelper<>();
edgeStats
.output(edgeMetricsHelper)
.setParallelism(parallelism)
.name("Edge metrics");
return this;
}
代码示例来源:origin: apache/flink
.sortGroup(1, Order.ASCENDING)
.reduceGroup(new GenerateGroupSpans<>())
.setParallelism(parallelism)
.name("Generate group spans");
.sortGroup(2, Order.ASCENDING)
.reduceGroup(new GenerateGroupPairs<>())
.name("Generate group pairs");
.groupBy(0, 1)
.reduceGroup(new ComputeScores<>(minimumScore, minimumRatio))
.name("Compute scores");
.withBroadcastSet(sumOfScoresAndNumberOfNeighborPairs, SUM_OF_SCORES_AND_NUMBER_OF_NEIGHBOR_PAIRS);
.flatMap(new MirrorResult<>())
.name("Mirror results");
} else {
代码示例来源:origin: apache/flink
GroupReduceOperator<Long, Long> reduced = data.reduceGroup(new CombineReducer2()).name("reducer");
reduced.setCombinable(true);
reduced.output(new DiscardingOutputFormat<Long>()).name("sink");
代码示例来源:origin: apache/flink
@Override
public DataSet<Vertex<K, Degrees>> runInternal(Graph<K, VV, EV> input)
throws Exception {
// s, t, bitmask
DataSet<Tuple2<K, ByteValue>> vertexWithEdgeOrder = input.getEdges()
.flatMap(new EmitAndFlipEdge<>())
.setParallelism(parallelism)
.name("Emit and flip edge")
.groupBy(0, 1)
.reduceGroup(new ReduceBitmask<>())
.setParallelism(parallelism)
.name("Reduce bitmask");
// s, d(s)
DataSet<Vertex<K, Degrees>> vertexDegrees = vertexWithEdgeOrder
.groupBy(0)
.reduceGroup(new DegreeCount<>())
.setParallelism(parallelism)
.name("Degree count");
if (includeZeroDegreeVertices.get()) {
vertexDegrees = input.getVertices()
.leftOuterJoin(vertexDegrees)
.where(0)
.equalTo(0)
.with(new JoinVertexWithVertexDegrees<>())
.setParallelism(parallelism)
.name("Zero degree vertices");
}
return vertexDegrees;
}
代码示例来源:origin: apache/flink
.join(edgesById).where(Triad.V2, Triad.V3).equalTo(Edge.V1, Edge.V2).with(new TriadFilter());
代码示例来源:origin: dataArtisans/cascading-flink
private DataSet<Tuple> translateGlobalGroupBy(DataSet<Tuple> input, FlowNode node, int dop,
String[] sortKeys, Order sortOrder, Fields outFields) {
DataSet<Tuple> result = input;
// sort on sorting keys if necessary
if(sortKeys != null && sortKeys.length > 0) {
result = result
.sortPartition(sortKeys[0], sortOrder)
.setParallelism(1)
.name("reduce-"+ node.getID());
for(int i=1; i<sortKeys.length; i++) {
result = result
.sortPartition(sortKeys[i], sortOrder)
.setParallelism(1);
}
}
// group all data
return result
.reduceGroup(new GroupByReducer(node))
.returns(new TupleTypeInfo(outFields))
.withParameters(this.getFlinkNodeConfig(node))
.setParallelism(dop)
.name("reduce-"+ node.getID());
}
代码示例来源:origin: apache/flink
.reduceGroup(new IdentityGroupReducerCombinable<Tuple3<Long,Long,Long>>()).withForwardedFields("0", "1", "2")
.groupBy(0)
.reduceGroup(new IdentityGroupReducerCombinable<Tuple3<Long,Long,Long>>())
.output(new DiscardingOutputFormat<Tuple3<Long, Long, Long>>());
代码示例来源:origin: apache/flink
.withForwardedFields("*").name(NEXT_WORKSET_REDUCER_NAME);
代码示例来源:origin: org.apache.flink/flink-gelly_2.10
.sortGroup(1, Order.ASCENDING)
.reduceGroup(new GenerateGroupSpans<K>())
.setParallelism(littleParallelism)
.name("Generate group spans");
.sortGroup(2, Order.ASCENDING)
.reduceGroup(new GenerateGroupPairs<K>())
.name("Generate group pairs");
.groupBy(0, 1)
.reduceGroup(new ComputeScores<K>(minimumScore, minimumRatio))
.name("Compute scores");
.withBroadcastSet(sumOfScoresAndNumberOfNeighborPairs, SUM_OF_SCORES_AND_NUMBER_OF_NEIGHBOR_PAIRS);
代码示例来源:origin: apache/flink
/**
* Applies a GroupReduce transformation on a non-grouped {@link DataSet}.
*
* <p>The transformation calls a {@link org.apache.flink.api.common.functions.RichGroupReduceFunction} once with the full DataSet.
* The GroupReduceFunction can iterate over all elements of the DataSet and emit any
* number of output elements including none.
*
* @param reducer The GroupReduceFunction that is applied on the DataSet.
* @return A GroupReduceOperator that represents the reduced DataSet.
*
* @see org.apache.flink.api.common.functions.RichGroupReduceFunction
* @see org.apache.flink.api.java.operators.GroupReduceOperator
* @see DataSet
*/
public <R> GroupReduceOperator<T, R> reduceGroup(GroupReduceFunction<T, R> reducer) {
if (reducer == null) {
throw new NullPointerException("GroupReduce function must not be null.");
}
String callLocation = Utils.getCallLocationName();
TypeInformation<R> resultType = TypeExtractor.getGroupReduceReturnTypes(reducer, getType(), callLocation, true);
return new GroupReduceOperator<>(this, resultType, clean(reducer), callLocation);
}
代码示例来源:origin: apache/flink
@Test
public void testWithKryoGenericSer() throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().enableForceKryo();
Path in = new Path(inFile.getAbsoluteFile().toURI());
AvroInputFormat<User> users = new AvroInputFormat<>(in, User.class);
DataSet<User> usersDS = env.createInput(users);
DataSet<Tuple2<String, Integer>> res = usersDS
.groupBy((KeySelector<User, String>) value -> String.valueOf(value.getName()))
.reduceGroup((GroupReduceFunction<User, Tuple2<String, Integer>>) (values, out) -> {
for (User u : values) {
out.collect(new Tuple2<>(u.getName().toString(), 1));
}
})
.returns(Types.TUPLE(Types.STRING, Types.INT));
res.writeAsText(resultPath);
env.execute("Avro Key selection");
expected = "(Charlie,1)\n(Alyssa,1)\n";
}
代码示例来源:origin: apache/flink
@Override
protected void testProgram() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
DataSet<String> initialInput = env.fromElements("1", "2", "3", "4", "5").name("input");
IterativeDataSet<String> iteration = initialInput.iterate(5).name("Loop");
DataSet<String> sumReduce = iteration.reduceGroup(new SumReducer()).name("Compute sum (GroupReduce");
DataSet<String> terminationFilter = iteration.filter(new TerminationFilter()).name("Compute termination criterion (Map)");
List<String> result = iteration.closeWith(sumReduce, terminationFilter).collect();
containsResultAsText(result, EXPECTED);
}
代码示例来源:origin: apache/flink
@Test
public void testSemanticPropsWithKeySelector5() {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
GroupReduceOperator<Tuple5<Integer, Long, String, Long, Integer>, Tuple5<Integer, Long, String, Long, Integer>> reduceOp =
tupleDs.groupBy(new DummyTestKeySelector())
.reduceGroup(new DummyGroupReduceFunction3())
.withForwardedFields("4->0;3;3->1;2");
SemanticProperties semProps = reduceOp.getSemanticProperties();
assertTrue(semProps.getForwardingTargetFields(0, 0).size() == 0);
assertTrue(semProps.getForwardingTargetFields(0, 1).size() == 0);
assertTrue(semProps.getForwardingTargetFields(0, 2).size() == 0);
assertTrue(semProps.getForwardingTargetFields(0, 3).size() == 0);
assertTrue(semProps.getForwardingTargetFields(0, 4).size() == 1);
assertTrue(semProps.getForwardingTargetFields(0, 4).contains(2));
assertTrue(semProps.getForwardingTargetFields(0, 5).size() == 2);
assertTrue(semProps.getForwardingTargetFields(0, 5).contains(1));
assertTrue(semProps.getForwardingTargetFields(0, 5).contains(3));
assertTrue(semProps.getForwardingTargetFields(0, 6).size() == 1);
assertTrue(semProps.getForwardingTargetFields(0, 6).contains(0));
assertTrue(semProps.getForwardingSourceField(0, 0) == 6);
assertTrue(semProps.getForwardingSourceField(0, 1) == 5);
assertTrue(semProps.getForwardingSourceField(0, 2) == 4);
assertTrue(semProps.getForwardingSourceField(0, 3) == 5);
assertTrue(semProps.getForwardingSourceField(0, 4) < 0);
assertTrue(semProps.getReadFields(0) == null);
}
代码示例来源:origin: apache/flink
@Override
protected void testProgram() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Integer> data = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8);
IterativeDataSet<Integer> iteration = data.iterate(10);
DataSet<Integer> result = data.reduceGroup(new PickOneAllReduce()).withBroadcastSet(iteration, "bc");
final List<Integer> resultList = new ArrayList<Integer>();
iteration.closeWith(result).output(new LocalCollectionOutputFormat<Integer>(resultList));
env.execute();
Assert.assertEquals(8, resultList.get(0).intValue());
}
代码示例来源:origin: apache/flink
@Test
public void testFirstNOnUngroupedDS() throws Exception {
/*
* First-n on ungrouped data set
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
DataSet<Tuple1<Integer>> seven = ds.first(7).map(new OneMapper()).sum(0);
List<Tuple1<Integer>> result = seven.collect();
String expected = "(7)\n";
compareResultAsText(result, expected);
}
内容来源于网络,如有侵权,请联系作者删除!