本文整理了Java中org.apache.flink.api.java.operators.UnsortedGrouping.aggregate()
方法的一些代码示例,展示了UnsortedGrouping.aggregate()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。UnsortedGrouping.aggregate()
方法的具体详情如下:
包路径:org.apache.flink.api.java.operators.UnsortedGrouping
类名称:UnsortedGrouping
方法名:aggregate
[英]Applies an Aggregate transformation on a grouped org.apache.flink.api.java.tuple.Tuple DataSet.
Note: Only Tuple DataSets can be aggregated. The transformation applies a built-in Aggregations on a specified field of a Tuple group. Additional aggregation functions can be added to the resulting AggregateOperator by calling AggregateOperator#and(Aggregations,int).
[中]在分组的组织上应用聚合转换。阿帕奇。弗林克。应用程序编程接口。JAVA元组。元组数据集。
注:只能聚合元组数据集。转换在元组组的指定字段上应用内置聚合。通过调用AggregateOperator#和(Aggregations,int),可以向生成的AggregateOperator添加额外的聚合函数。
代码示例来源:origin: apache/flink
/**
* Syntactic sugar for aggregate (MAX, field).
* @param field The index of the Tuple field on which the aggregation function is applied.
* @return An AggregateOperator that represents the max'ed DataSet.
*
* @see org.apache.flink.api.java.operators.AggregateOperator
*/
public AggregateOperator<T> max (int field) {
return this.aggregate (Aggregations.MAX, field, Utils.getCallLocationName());
}
代码示例来源:origin: apache/flink
/**
* Syntactic sugar for aggregate (MIN, field).
* @param field The index of the Tuple field on which the aggregation function is applied.
* @return An AggregateOperator that represents the min'ed DataSet.
*
* @see org.apache.flink.api.java.operators.AggregateOperator
*/
public AggregateOperator<T> min (int field) {
return this.aggregate (Aggregations.MIN, field, Utils.getCallLocationName());
}
代码示例来源:origin: apache/flink
/**
* Syntactic sugar for aggregate (SUM, field).
* @param field The index of the Tuple field on which the aggregation function is applied.
* @return An AggregateOperator that represents the summed DataSet.
*
* @see org.apache.flink.api.java.operators.AggregateOperator
*/
public AggregateOperator<T> sum (int field) {
return this.aggregate (Aggregations.SUM, field, Utils.getCallLocationName());
}
代码示例来源:origin: apache/flink
/**
* Applies an Aggregate transformation on a grouped {@link org.apache.flink.api.java.tuple.Tuple} {@link DataSet}.
*
* <p><b>Note: Only Tuple DataSets can be aggregated.</b>
* The transformation applies a built-in {@link Aggregations Aggregation} on a specified field
* of a Tuple group. Additional aggregation functions can be added to the resulting
* {@link AggregateOperator} by calling {@link AggregateOperator#and(Aggregations, int)}.
*
* @param agg The built-in aggregation function that is computed.
* @param field The index of the Tuple field on which the aggregation function is applied.
* @return An AggregateOperator that represents the aggregated DataSet.
*
* @see org.apache.flink.api.java.tuple.Tuple
* @see Aggregations
* @see AggregateOperator
* @see DataSet
*/
public AggregateOperator<T> aggregate(Aggregations agg, int field) {
return aggregate(agg, field, Utils.getCallLocationName());
}
代码示例来源:origin: apache/flink
@Override
protected void testProgram() throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<String> text = env.fromElements(WordCountData.TEXT);
DataSet<Tuple2<String, Integer>> words = text.flatMap(new WordCount.Tokenizer());
DataSet<Tuple2<String, Integer>> result = words.groupBy(0).aggregate(Aggregations.SUM, 1);
result.output(new LocalCollectionOutputFormat<Tuple2<String, Integer>>(resultsCollected));
env.execute("Word Count Collection");
}
}
代码示例来源:origin: apache/flink
public static void main(String[] args) throws Exception {
if (args.length < 2) {
System.err.println("Usage: WordCount <input path> <result path>");
return;
}
final String inputPath = args[0];
final String outputPath = args[1];
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// Set up the Hadoop Input Format
Job job = Job.getInstance();
HadoopInputFormat<LongWritable, Text> hadoopInputFormat = new HadoopInputFormat<LongWritable, Text>(new TextInputFormat(), LongWritable.class, Text.class, job);
TextInputFormat.addInputPath(job, new Path(inputPath));
// Create a Flink job with it
DataSet<Tuple2<LongWritable, Text>> text = env.createInput(hadoopInputFormat);
// Tokenize the line and convert from Writable "Text" to String for better handling
DataSet<Tuple2<String, Integer>> words = text.flatMap(new Tokenizer());
// Sum up the words
DataSet<Tuple2<String, Integer>> result = words.groupBy(0).aggregate(Aggregations.SUM, 1);
// Convert String back to Writable "Text" for use with Hadoop Output Format
DataSet<Tuple2<Text, IntWritable>> hadoopResult = result.map(new HadoopDatatypeMapper());
// Set up Hadoop Output Format
HadoopOutputFormat<Text, IntWritable> hadoopOutputFormat = new HadoopOutputFormat<Text, IntWritable>(new TextOutputFormat<Text, IntWritable>(), job);
hadoopOutputFormat.getConfiguration().set("mapreduce.output.textoutputformat.separator", " ");
hadoopOutputFormat.getConfiguration().set("mapred.textoutputformat.separator", " "); // set the value for both, since this test
TextOutputFormat.setOutputPath(job, new Path(outputPath));
// Output & Execute
hadoopResult.output(hadoopOutputFormat);
env.execute("Word Count");
}
代码示例来源:origin: apache/flink
env.fromElements(new Tuple3<Double, StringValue, Long>(3.141592, new StringValue("foobar"), Long.valueOf(77)));
initialData.groupBy(0).aggregate(Aggregations.MIN, 1).and(Aggregations.SUM, 2).output(new DiscardingOutputFormat<Tuple3<Double, StringValue, Long>>());
代码示例来源:origin: apache/flink
@Test
public void testGroupedAggregate() throws Exception {
/*
* Grouped Aggregate
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
DataSet<Tuple2<Long, Integer>> aggregateDs = ds.groupBy(1)
.aggregate(Aggregations.SUM, 0)
.project(1, 0);
List<Tuple2<Long, Integer>> result = aggregateDs.collect();
String expected = "1,1\n" +
"2,5\n" +
"3,15\n" +
"4,34\n" +
"5,65\n" +
"6,111\n";
compareResultAsTuples(result, expected);
}
代码示例来源:origin: apache/flink
@Test
public void testGroupedAggregateOfMutableValueTypes() throws Exception {
/*
* Grouped Aggregate of mutable value types
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple3<IntValue, LongValue, StringValue>> ds = ValueCollectionDataSets.get3TupleDataSet(env);
DataSet<Tuple2<IntValue, LongValue>> aggregateDs = ds.groupBy(1)
.aggregate(Aggregations.SUM, 0)
.project(1, 0);
List<Tuple2<IntValue, LongValue>> result = aggregateDs.collect();
String expected = "1,1\n" +
"2,5\n" +
"3,15\n" +
"4,34\n" +
"5,65\n" +
"6,111\n";
compareResultAsTuples(result, expected);
}
代码示例来源:origin: apache/flink
@Test
public void testNestedAggregateOfMutableValueTypes() throws Exception {
/*
* Nested Aggregate of mutable value types
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple3<IntValue, LongValue, StringValue>> ds = ValueCollectionDataSets.get3TupleDataSet(env);
DataSet<Tuple1<IntValue>> aggregateDs = ds.groupBy(1)
.aggregate(Aggregations.MIN, 0)
.aggregate(Aggregations.MIN, 0)
.project(0);
List<Tuple1<IntValue>> result = aggregateDs.collect();
String expected = "1\n";
compareResultAsTuples(result, expected);
}
}
代码示例来源:origin: apache/flink
@Test
public void testNestedAggregate() throws Exception {
/*
* Nested Aggregate
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
DataSet<Tuple1<Integer>> aggregateDs = ds.groupBy(1)
.aggregate(Aggregations.MIN, 0)
.aggregate(Aggregations.MIN, 0)
.project(0);
List<Tuple1<Integer>> result = aggregateDs.collect();
String expected = "1\n";
compareResultAsTuples(result, expected);
}
代码示例来源:origin: apache/flink
public static DataSet<Tuple2<Long, Double>> constructPlan(DataSet<Tuple2<Long, Double>> initialData, int numIterations) {
DeltaIteration<Tuple2<Long, Double>, Tuple2<Long, Double>> iteration = initialData.iterateDelta(initialData, numIterations, 0);
DataSet<Tuple2<Long, Double>> delta = iteration.getSolutionSet()
.join(iteration.getWorkset().flatMap(new Duplicator())).where(0).equalTo(0).with(new SummingJoin()).name(JOIN_1)
.groupBy(0).aggregate(Aggregations.MIN, 1).map(new Expander())
.join(iteration.getSolutionSet()).where(0).equalTo(0).with(new SummingJoinProject()).name(JOIN_2);
DataSet<Tuple2<Long, Double>> changes = delta.groupBy(0).aggregate(Aggregations.SUM, 1);
DataSet<Tuple2<Long, Double>> result = iteration.closeWith(delta, changes);
return result;
}
代码示例来源:origin: apache/flink
public static DataSet<Tuple2<Long, Long>> doBulkIteration(DataSet<Tuple2<Long, Long>> vertices, DataSet<Tuple2<Long, Long>> edges) {
// open a bulk iteration
IterativeDataSet<Tuple2<Long, Long>> iteration = vertices.iterate(20);
DataSet<Tuple2<Long, Long>> changes = iteration
.join(edges).where(0).equalTo(0).with(new Join222())
.groupBy(0).aggregate(Aggregations.MIN, 1)
.join(iteration).where(0).equalTo(0)
.flatMap(new FlatMapJoin());
// close the bulk iteration
return iteration.closeWith(changes);
}
代码示例来源:origin: apache/flink
.groupBy(0).aggregate(Aggregations.MIN, 1)
.join(iteration.getSolutionSet()).where(0).equalTo(0)
.with(new ComponentIdFilter());
代码示例来源:origin: apache/flink
.with(new ConnectedComponents.NeighborWithComponentIDJoin())
.groupBy(0).aggregate(Aggregations.MIN, 1)
代码示例来源:origin: apache/flink
public static DataSet<Tuple2<Long, Long>> doDeltaIteration(DataSet<Tuple2<Long, Long>> vertices, DataSet<Tuple2<Long, Long>> edges) {
DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> depIteration = vertices.iterateDelta(vertices, 100, 0);
DataSet<Tuple1<Long>> candidates = depIteration.getWorkset().join(edges).where(0).equalTo(0)
.projectSecond(1);
DataSet<Tuple1<Long>> grouped = candidates.groupBy(0).reduceGroup(new Reduce101());
DataSet<Tuple2<Long, Long>> candidatesDependencies =
grouped.join(edges).where(0).equalTo(1).projectSecond(0, 1);
DataSet<Tuple2<Long, Long>> verticesWithNewComponents =
candidatesDependencies.join(depIteration.getSolutionSet()).where(0).equalTo(0)
.with(new Join222())
.groupBy(0).aggregate(Aggregations.MIN, 1);
DataSet<Tuple2<Long, Long>> updatedComponentId =
verticesWithNewComponents.join(depIteration.getSolutionSet()).where(0).equalTo(0)
.flatMap(new FlatMapJoin());
DataSet<Tuple2<Long, Long>> depResult = depIteration.closeWith(updatedComponentId, updatedComponentId);
return depResult;
}
代码示例来源:origin: apache/flink
@Override
protected void testProgram() throws Exception {
// set up execution environment
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// read vertex and edge data
DataSet<Tuple1<Long>> vertices = env.readCsvFile(verticesPath).types(Long.class);
DataSet<Tuple2<Long, Long>> edges = env.readCsvFile(edgesPath).fieldDelimiter(" ").types(Long.class, Long.class)
.flatMap(new ConnectedComponents.UndirectEdge());
// assign the initial components (equal to the vertex id)
DataSet<Tuple2<Long, Long>> verticesWithInitialId = vertices.map(new ConnectedComponentsITCase.DuplicateValue<Long>());
// open a delta iteration
DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iteration =
verticesWithInitialId.iterateDelta(verticesWithInitialId, 100, 0);
// apply the step logic: join with the edges, select the minimum neighbor, update if the component of the candidate is smaller
DataSet<Tuple2<Long, Long>> minNeighbor = iteration.getWorkset()
.join(edges).where(0).equalTo(0).with(new ConnectedComponents.NeighborWithComponentIDJoin())
.groupBy(0).aggregate(Aggregations.MIN, 1);
DataSet<Tuple2<Long, Long>> updatedIds = iteration.getSolutionSet()
.join(minNeighbor).where(0).equalTo(0).with(new UpdateComponentIdMatchMirrored());
// close the delta iteration (delta and new workset are identical)
DataSet<Tuple2<Long, Long>> result = iteration.closeWith(updatedIds, updatedIds);
result.writeAsCsv(resultPath, "\n", " ");
// execute program
env.execute("Connected Components Example");
}
代码示例来源:origin: apache/flink
@Override
protected void testProgram() throws Exception {
// set up execution environment
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// read vertex and edge data
DataSet<Tuple1<Long>> vertices = env.readCsvFile(verticesPath).types(Long.class);
DataSet<Tuple2<Long, Long>> edges = env.readCsvFile(edgesPath).fieldDelimiter(" ").types(Long.class, Long.class)
.flatMap(new ConnectedComponents.UndirectEdge());
// assign the initial components (equal to the vertex id)
DataSet<Tuple2<Long, Long>> verticesWithInitialId = vertices.map(new DuplicateValue<Long>());
// open a delta iteration
DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iteration =
verticesWithInitialId.iterateDelta(verticesWithInitialId, 100, 0);
// apply the step logic: join with the edges, select the minimum neighbor, update if the component of the candidate is smaller
DataSet<Tuple2<Long, Long>> changes = iteration.getWorkset().join(edges).where(0).equalTo(0).with(new ConnectedComponents.NeighborWithComponentIDJoin())
.groupBy(0).aggregate(Aggregations.MIN, 1)
.join(iteration.getSolutionSet()).where(0).equalTo(0)
.with(new ConnectedComponents.ComponentIdFilter());
// close the delta iteration (delta and new workset are identical)
DataSet<Tuple2<Long, Long>> result = iteration.closeWith(changes, changes);
result.writeAsCsv(resultPath, "\n", " ");
// execute program
env.execute("Connected Components Example");
}
代码示例来源:origin: apache/flink
@Override
protected void testProgram() throws Exception {
// set up execution environment
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// read vertex and edge data
DataSet<Tuple1<Long>> vertices = env.readCsvFile(verticesPath).types(Long.class);
DataSet<Tuple2<Long, Long>> edges = env.readCsvFile(edgesPath).fieldDelimiter(" ").types(Long.class, Long.class)
.flatMap(new UndirectEdge());
// assign the initial components (equal to the vertex id)
DataSet<Tuple2<Long, Long>> verticesWithInitialId = vertices.map(new ConnectedComponentsITCase.DuplicateValue<Long>());
// open a delta iteration
DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iteration =
verticesWithInitialId.iterateDelta(verticesWithInitialId, 100, 0);
iteration.setSolutionSetUnManaged(true);
// apply the step logic: join with the edges, select the minimum neighbor, update if the component of the candidate is smaller
DataSet<Tuple2<Long, Long>> changes = iteration.getWorkset().join(edges).where(0).equalTo(0).with(new NeighborWithComponentIDJoin())
.groupBy(0).aggregate(Aggregations.MIN, 1)
.join(iteration.getSolutionSet()).where(0).equalTo(0)
.with(new ComponentIdFilter());
// close the delta iteration (delta and new workset are identical)
DataSet<Tuple2<Long, Long>> result = iteration.closeWith(changes, changes);
result.writeAsCsv(resultPath, "\n", " ");
// execute program
env.execute("Connected Components Example");
}
代码示例来源:origin: apache/flink
private static void runConnectedComponents(ExecutionEnvironment env) throws Exception {
env.setParallelism(PARALLELISM);
env.getConfig().disableSysoutLogging();
// read vertex and edge data
DataSet<Long> vertices = ConnectedComponentsData.getDefaultVertexDataSet(env)
.rebalance();
DataSet<Tuple2<Long, Long>> edges = ConnectedComponentsData.getDefaultEdgeDataSet(env)
.rebalance()
.flatMap(new ConnectedComponents.UndirectEdge());
// assign the initial components (equal to the vertex id)
DataSet<Tuple2<Long, Long>> verticesWithInitialId = vertices
.map(new ConnectedComponents.DuplicateValue<Long>());
// open a delta iteration
DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iteration =
verticesWithInitialId.iterateDelta(verticesWithInitialId, 100, 0);
// apply the step logic: join with the edges, select the minimum neighbor,
// update if the component of the candidate is smaller
DataSet<Tuple2<Long, Long>> changes = iteration.getWorkset().join(edges)
.where(0).equalTo(0)
.with(new ConnectedComponents.NeighborWithComponentIDJoin())
.groupBy(0).aggregate(Aggregations.MIN, 1)
.join(iteration.getSolutionSet())
.where(0).equalTo(0)
.with(new ConnectedComponents.ComponentIdFilter());
// close the delta iteration (delta and new workset are identical)
DataSet<Tuple2<Long, Long>> result = iteration.closeWith(changes, changes);
result.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
env.execute();
}
内容来源于网络,如有侵权,请联系作者删除!