本文整理了Java中org.apache.flink.api.java.operators.UnsortedGrouping
类的一些代码示例,展示了UnsortedGrouping
类的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。UnsortedGrouping
类的具体详情如下:
包路径:org.apache.flink.api.java.operators.UnsortedGrouping
类名称:UnsortedGrouping
[英]A Grouping that is unsorted.
[中]未分类的分组。
代码示例来源:origin: apache/flink
/**
* Returns a new set containing the first n elements in this grouped {@link DataSet}.
*
* @param n The desired number of elements for each group.
* @return A GroupReduceOperator that represents the DataSet containing the elements.
*/
public GroupReduceOperator<T, T> first(int n) {
if (n < 1) {
throw new InvalidProgramException("Parameter n of first(n) must be at least 1.");
}
return reduceGroup(new FirstReducer<T>(n));
}
代码示例来源:origin: apache/flink
private void createSortOperation(PythonOperationInfo info) {
if (sets.isDataSet(info.parentID)) {
throw new IllegalArgumentException("sort() can not be applied on a DataSet.");
} else if (sets.isUnsortedGrouping(info.parentID)) {
sets.add(info.setID, sets.getUnsortedGrouping(info.parentID).sortGroup(info.field, info.order));
} else if (sets.isSortedGrouping(info.parentID)) {
sets.add(info.setID, sets.getSortedGrouping(info.parentID).sortGroup(info.field, info.order));
}
}
代码示例来源:origin: apache/flink
/**
* Return the degree of all vertices in the graph.
*
* @return A DataSet of {@code Tuple2<vertexId, degree>}
*/
public DataSet<Tuple2<K, LongValue>> getDegrees() {
return outDegrees()
.union(inDegrees()).name("In- and out-degree")
.groupBy(0).sum(1).name("Sum");
}
代码示例来源:origin: apache/flink
@Override
public DataSet<Tuple3<K, K, K>> run(Graph<K, VV, EV> input) throws Exception {
DataSet<Edge<K, EV>> edges = input.getEdges();
// annotate edges with degrees
DataSet<EdgeWithDegrees<K>> edgesWithDegrees = edges.flatMap(new EdgeDuplicator<>())
.groupBy(0).sortGroup(1, Order.ASCENDING).reduceGroup(new DegreeCounter<>())
.groupBy(EdgeWithDegrees.V1, EdgeWithDegrees.V2).reduce(new DegreeJoiner<>());
// project edges by degrees
DataSet<Edge<K, NullValue>> edgesByDegree = edgesWithDegrees.map(new EdgeByDegreeProjector<>());
// project edges by vertex id
DataSet<Edge<K, NullValue>> edgesById = edgesByDegree.map(new EdgeByIdProjector<>());
DataSet<Tuple3<K, K, K>> triangles = edgesByDegree
// build triads
.groupBy(EdgeWithDegrees.V1).sortGroup(EdgeWithDegrees.V2, Order.ASCENDING)
.reduceGroup(new TriadBuilder<>())
// filter triads
.join(edgesById, JoinHint.REPARTITION_HASH_SECOND).where(Triad.V2, Triad.V3).equalTo(0, 1).with(new TriadFilter<>());
return triangles;
}
代码示例来源:origin: apache/flink
@Test
public void testFirstNOnGroupedDS() throws Exception {
/*
* First-n on grouped data set
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
DataSet<Tuple2<Long, Integer>> first = ds.groupBy(1).first(4)
.map(new OneMapper2()).groupBy(0).sum(1);
List<Tuple2<Long, Integer>> result = first.collect();
String expected = "(1,1)\n(2,2)\n(3,3)\n(4,4)\n(5,4)\n(6,4)\n";
compareResultAsText(result, expected);
}
代码示例来源:origin: apache/flink
private static <T, B extends CopyableIterator<T>> void testReducePerformance
(B iterator, TypeInformation<T> typeInfo, CombineHint hint, int numRecords, boolean print) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().enableObjectReuse();
@SuppressWarnings("unchecked")
DataSet<T> output =
env.fromParallelCollection(new SplittableRandomIterator<T, B>(numRecords, iterator), typeInfo)
.groupBy("0")
.reduce(new SumReducer()).setCombineHint(hint);
long start = System.currentTimeMillis();
System.out.println(output.count());
long end = System.currentTimeMillis();
if (print) {
System.out.println("=== Time for " + iterator.getClass().getSimpleName() + " with hint " + hint.toString() + ": " + (end - start) + "ms ===");
}
}
代码示例来源:origin: apache/flink
/**
* Syntactic sugar for aggregate (MIN, field).
* @param field The index of the Tuple field on which the aggregation function is applied.
* @return An AggregateOperator that represents the min'ed DataSet.
*
* @see org.apache.flink.api.java.operators.AggregateOperator
*/
public AggregateOperator<T> min (int field) {
return this.aggregate (Aggregations.MIN, field, Utils.getCallLocationName());
}
代码示例来源:origin: apache/flink
public static DataSet<Tuple2<Long, Long>> doDeltaIteration(DataSet<Tuple2<Long, Long>> vertices, DataSet<Tuple2<Long, Long>> edges) {
DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> depIteration = vertices.iterateDelta(vertices, 100, 0);
DataSet<Tuple1<Long>> candidates = depIteration.getWorkset().join(edges).where(0).equalTo(0)
.projectSecond(1);
DataSet<Tuple1<Long>> grouped = candidates.groupBy(0).reduceGroup(new Reduce101());
DataSet<Tuple2<Long, Long>> candidatesDependencies =
grouped.join(edges).where(0).equalTo(1).projectSecond(0, 1);
DataSet<Tuple2<Long, Long>> verticesWithNewComponents =
candidatesDependencies.join(depIteration.getSolutionSet()).where(0).equalTo(0)
.with(new Join222())
.groupBy(0).aggregate(Aggregations.MIN, 1);
DataSet<Tuple2<Long, Long>> updatedComponentId =
verticesWithNewComponents.join(depIteration.getSolutionSet()).where(0).equalTo(0)
.flatMap(new FlatMapJoin());
DataSet<Tuple2<Long, Long>> depResult = depIteration.closeWith(updatedComponentId, updatedComponentId);
return depResult;
}
代码示例来源:origin: apache/flink
@Test
public void testCustomPartitioningTupleReduce() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple2<Integer, Integer>> data = env.fromElements(new Tuple2<Integer, Integer>(0, 0))
.rebalance().setParallelism(4);
data.groupBy(0).withPartitioner(new TestPartitionerInt())
.reduce(new SelectOneReducer<Tuple2<Integer,Integer>>())
.output(new DiscardingOutputFormat<Tuple2<Integer, Integer>>());
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
SinkPlanNode sink = op.getDataSinks().iterator().next();
SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource();
SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getInput().getSource();
assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
assertEquals(ShipStrategyType.PARTITION_CUSTOM, reducer.getInput().getShipStrategy());
assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
代码示例来源:origin: apache/flink
@Test
public void testCustomPartitioningTupleAgg() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple2<Integer, Integer>> data = env.fromElements(new Tuple2<Integer, Integer>(0, 0))
.rebalance().setParallelism(4);
data.groupBy(0).withPartitioner(new TestPartitionerInt())
.sum(1)
.output(new DiscardingOutputFormat<Tuple2<Integer, Integer>>());
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
SinkPlanNode sink = op.getDataSinks().iterator().next();
SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource();
SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getInput().getSource();
assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
assertEquals(ShipStrategyType.PARTITION_CUSTOM, reducer.getInput().getShipStrategy());
assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
代码示例来源:origin: apache/flink
tupleDs.groupBy(2).first(1);
} catch (Exception e) {
Assert.fail();
tupleDs.groupBy(1, 3).first(10);
} catch (Exception e) {
Assert.fail();
tupleDs.groupBy(0).first(0);
Assert.fail();
} catch (InvalidProgramException ipe) {
tupleDs.groupBy(2).first(-1);
Assert.fail();
} catch (InvalidProgramException ipe) {
代码示例来源:origin: apache/flink
public void testGroupedReduce(ExecutionEnvironment env) throws Exception {
/*
* Test ReduceCombineDriver and ReduceDriver
*/
LOG.info("Testing grouped reduce");
env.getConfig().enableObjectReuse();
List<Tuple2<IntValue, IntValue>> enabledResult = getDataSet(env)
.groupBy(0)
.reduce(new OverwriteObjectsReduce(true))
.collect();
Collections.sort(enabledResult, comparator);
env.getConfig().disableObjectReuse();
List<Tuple2<IntValue, IntValue>> disabledResult = getDataSet(env)
.groupBy(0)
.reduce(new OverwriteObjectsReduce(true))
.collect();
Collections.sort(disabledResult, comparator);
Assert.assertThat(disabledResult, is(enabledResult));
}
代码示例来源:origin: apache/flink
/**
* Syntactic sugar for aggregate (MAX, field).
* @param field The index of the Tuple field on which the aggregation function is applied.
* @return An AggregateOperator that represents the max'ed DataSet.
*
* @see org.apache.flink.api.java.operators.AggregateOperator
*/
public AggregateOperator<T> max (int field) {
return this.aggregate (Aggregations.MAX, field, Utils.getCallLocationName());
}
代码示例来源:origin: apache/flink
@Test
public void testCustomPartitioningTupleReduce() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Pojo2> data = env.fromElements(new Pojo2())
.rebalance().setParallelism(4);
data.groupBy("a").withPartitioner(new TestPartitionerInt())
.reduce(new SelectOneReducer<Pojo2>())
.output(new DiscardingOutputFormat<Pojo2>());
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
SinkPlanNode sink = op.getDataSinks().iterator().next();
SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource();
SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getInput().getSource();
assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
assertEquals(ShipStrategyType.PARTITION_CUSTOM, reducer.getInput().getShipStrategy());
assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
代码示例来源:origin: apache/flink
private <K extends Tuple> void createFirstOperation(PythonOperationInfo info) {
if (sets.isDataSet(info.parentID)) {
DataSet<byte[]> op = sets.getDataSet(info.parentID);
sets.add(info.setID, op
.first(info.count).setParallelism(info.parallelism).name("First"));
} else if (sets.isUnsortedGrouping(info.parentID)) {
UnsortedGrouping<Tuple2<K, byte[]>> op = sets.getUnsortedGrouping(info.parentID);
sets.add(info.setID, op
.first(info.count).setParallelism(info.parallelism).name("First")
.map(new KeyDiscarder<K>()).setParallelism(info.parallelism).name("FirstPostStep"));
} else if (sets.isSortedGrouping(info.parentID)) {
SortedGrouping<Tuple2<K, byte[]>> op = sets.getSortedGrouping(info.parentID);
sets.add(info.setID, op
.first(info.count).setParallelism(info.parallelism).name("First")
.map(new KeyDiscarder<K>()).setParallelism(info.parallelism).name("FirstPostStep"));
}
}
代码示例来源:origin: apache/flink
private <IN, OUT> DataSet<OUT> applyReduceOperation(UnsortedGrouping<IN> op1, PythonOperationInfo info, TypeInformation<OUT> type) {
return op1
.reduceGroup(new IdentityGroupReduce<IN>()).setCombinable(false).setParallelism(info.parallelism).name("PythonReducePreStep")
.mapPartition(new PythonMapPartition<IN, OUT>(operatorConfig, info.envID, info.setID, type))
.setParallelism(info.parallelism).name(info.name);
}
}
代码示例来源:origin: apache/flink
@Test(expected = InvalidProgramException.class)
public void testGroupSortKeyFields4() {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple4<Integer, Long, CustomType, Long[]>> tupleDs =
env.fromCollection(tupleWithCustomData, tupleWithCustomInfo);
// should not work
tupleDs.groupBy(0)
.sortGroup(2, Order.ASCENDING);
}
代码示例来源:origin: apache/flink
private Plan getWordCountPlan(File inFile, File outFile, int parallelism) {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(parallelism);
env.readTextFile(inFile.getAbsolutePath())
.flatMap(new Tokenizer())
.groupBy(0)
.sum(1)
.writeAsCsv(outFile.getAbsolutePath());
return env.createProgramPlan();
}
}
代码示例来源:origin: apache/flink
@Override
protected void testProgram() throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<String> text = env.readTextFile(textPath);
DataSet<WC> counts = text
.flatMap(new Tokenizer())
.groupBy("complex.someTest")
.reduce(new ReduceFunction<WC>() {
private static final long serialVersionUID = 1L;
public WC reduce(WC value1, WC value2) {
return new WC(value1.complex.someTest, value1.count + value2.count);
}
});
counts.writeAsText(resultPath);
env.execute("WordCount with custom data types example");
}
代码示例来源:origin: apache/flink
/**
* Syntactic sugar for aggregate (SUM, field).
* @param field The index of the Tuple field on which the aggregation function is applied.
* @return An AggregateOperator that represents the summed DataSet.
*
* @see org.apache.flink.api.java.operators.AggregateOperator
*/
public AggregateOperator<T> sum (int field) {
return this.aggregate (Aggregations.SUM, field, Utils.getCallLocationName());
}
内容来源于网络,如有侵权,请联系作者删除!