本文整理了Java中org.apache.flink.api.java.operators.MapOperator.groupBy()
方法的一些代码示例,展示了MapOperator.groupBy()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。MapOperator.groupBy()
方法的具体详情如下:
包路径:org.apache.flink.api.java.operators.MapOperator
类名称:MapOperator
方法名:groupBy
暂无
代码示例来源:origin: apache/flink
@Test
public void testForcedRebalancing() throws Exception {
/*
* Test forced rebalancing
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// generate some number in parallel
DataSet<Long> ds = env.generateSequence(1, 3000);
DataSet<Tuple2<Integer, Integer>> uniqLongs = ds
// introduce some partition skew by filtering
.filter(new Filter1())
// rebalance
.rebalance()
// count values in each partition
.map(new PartitionIndexMapper())
.groupBy(0)
.reduce(new Reducer1())
// round counts to mitigate runtime scheduling effects (lazy split assignment)
.map(new Mapper1());
List<Tuple2<Integer, Integer>> result = uniqLongs.collect();
StringBuilder expected = new StringBuilder();
int numPerPartition = 2220 / env.getParallelism() / 10;
for (int i = 0; i < env.getParallelism(); i++) {
expected.append('(').append(i).append(',')
.append(numPerPartition).append(")\n");
}
compareResultAsText(result, expected.toString());
}
代码示例来源:origin: apache/flink
.withForwardedFields("f1->f0")
.name("Vertex with in-edges")
.groupBy(0).reduce(new ApplyReduceFunction<>(reduceEdgesFunction))
.name("Reduce on edges");
case OUT:
.withForwardedFields("f0->f0")
.name("Vertex with out-edges")
.groupBy(0).reduce(new ApplyReduceFunction<>(reduceEdgesFunction))
.name("Reduce on edges");
case ALL:
代码示例来源:origin: apache/flink
return edges.map(new ProjectVertexIdMap<>(1)).name("Vertex ID")
.withForwardedFields("f1->f0")
.groupBy(0).reduceGroup(new ApplyGroupReduceFunction<>(edgesFunction))
.name("GroupReduce on in-edges").returns(typeInfo);
case OUT:
return edges.map(new ProjectVertexIdMap<>(0)).name("Vertex ID")
.withForwardedFields("f0")
.groupBy(0).reduceGroup(new ApplyGroupReduceFunction<>(edgesFunction))
.name("GroupReduce on out-edges").returns(typeInfo);
case ALL:
代码示例来源:origin: apache/flink
@Test
public void testFirstNOnGroupedDS() throws Exception {
/*
* First-n on grouped data set
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
DataSet<Tuple2<Long, Integer>> first = ds.groupBy(1).first(4)
.map(new OneMapper2()).groupBy(0).sum(1);
List<Tuple2<Long, Integer>> result = first.collect();
String expected = "(1,1)\n(2,2)\n(3,3)\n(4,4)\n(5,4)\n(6,4)\n";
compareResultAsText(result, expected);
}
代码示例来源:origin: apache/flink
@Override
public EdgeMetrics<K, VV, EV> run(Graph<K, VV, EV> input)
throws Exception {
super.run(input);
// s, t, (d(s), d(t))
DataSet<Edge<K, Tuple3<EV, LongValue, LongValue>>> edgeDegreePair = input
.run(new EdgeDegreePair<K, VV, EV>()
.setReduceOnTargetId(reduceOnTargetId)
.setParallelism(parallelism));
// s, d(s), count of (u, v) where deg(u) < deg(v) or (deg(u) == deg(v) and u < v)
DataSet<Tuple3<K, LongValue, LongValue>> edgeStats = edgeDegreePair
.map(new EdgeStats<>())
.setParallelism(parallelism)
.name("Edge stats")
.groupBy(0)
.reduce(new SumEdgeStats<>())
.setCombineHint(CombineHint.HASH)
.setParallelism(parallelism)
.name("Sum edge stats");
edgeMetricsHelper = new EdgeMetricsHelper<>();
edgeStats
.output(edgeMetricsHelper)
.setParallelism(parallelism)
.name("Edge metrics");
return this;
}
代码示例来源:origin: apache/flink
.groupBy(0).reduce(new CentroidAccumulator())
代码示例来源:origin: apache/flink
.setParallelism(parallelism)
.name("Order by ID")
.groupBy(0, 1)
.reduceGroup(new ReduceBitmask<>())
.setParallelism(parallelism)
.setParallelism(parallelism)
.name("Order by degree")
.groupBy(0, 1)
.reduceGroup(new ReduceBitmask<>())
.setParallelism(parallelism)
代码示例来源:origin: apache/flink
private static DataSet<String> analyze(DataSet<String> input, DataSet<String> stats, int branches) {
for (int i = 0; i < branches; i++) {
final int ii = i;
if (stats != null) {
input = input.map(
new RichMapFunction<String, String>() {
@Override
public String map(String value) {
return value;
}
}).withBroadcastSet(stats.map(s -> "(" + s + ").map"), "stats");
}
DataSet<String> branch = input
.map(s -> new Tuple2<>(0, s + ii)).returns(Types.TUPLE(Types.STRING, Types.INT))
.groupBy(0)
.minBy(1)
.map(kv -> kv.f1).returns(Types.STRING);
if (stats == null) {
stats = branch;
} else {
stats = stats.union(branch);
}
}
return stats.map(s -> "(" + s + ").stats");
}
}
代码示例来源:origin: apache/flink
.groupBy(0).reduce(new KMeans.CentroidAccumulator())
代码示例来源:origin: apache/flink
DataSet<Tuple3<Integer, Integer, Integer>> set = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
set = set.map(new MockMapper()).withForwardedFields("*")
.groupBy(0)
.reduce(new MockReducer()).withForwardedFields("f0->f1")
.map(new MockMapper()).withForwardedFields("*")
.groupBy(1)
.reduce(new MockReducer()).withForwardedFields("*");
代码示例来源:origin: apache/flink
.map(new IdentityMapper<Tuple3<Long,Long,Long>>()).withForwardedFields("0", "1", "2")
.groupBy(0, 1)
.reduceGroup(new IdentityGroupReducerCombinable<Tuple3<Long,Long,Long>>()).withForwardedFields("0", "1", "2")
代码示例来源:origin: apache/flink
.groupBy("*").reduceGroup(new IdentityGroupReducer<Long>())
.withForwardedFields("*").setParallelism(p).name("Reduce1")
.map(new IdentityMapper<Long>())
.withForwardedFields("*").setParallelism(p * 2).name("Map2")
.groupBy("*").reduceGroup(new IdentityGroupReducer<Long>())
.withForwardedFields("*").setParallelism(p * 2).name("Reduce2")
.output(new DiscardingOutputFormat<Long>()).setParallelism(p * 2).name("Sink");
代码示例来源:origin: apache/flink
DataSet<Tuple3<Integer, Integer, Integer>> in2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
in1 = in1.map(new MockMapper()).withForwardedFields("*")
.groupBy(0)
.reduce(new MockReducer()).withForwardedFields("f0->f1");
in2 = in2.map(new MockMapper()).withForwardedFields("*")
.groupBy(1)
.reduce(new MockReducer()).withForwardedFields("f1->f2");
DataSet<Tuple3<Integer, Integer, Integer>> out = in1.join(in2).where(1).equalTo(2).with(new MockJoin());
代码示例来源:origin: apache/flink
.groupBy("*").reduceGroup(new IdentityGroupReducer<Long>())
.withForwardedFields("*").setParallelism(p).name("Reduce1")
.map(new IdentityMapper<Long>())
.withForwardedFields("*").setParallelism(p * 2).name("Map2")
.groupBy("*").reduceGroup(new IdentityGroupReducer<Long>())
.withForwardedFields("*").setParallelism(p * 2).name("Reduce2")
.output(new DiscardingOutputFormat<Long>()).setParallelism(p * 2).name("Sink");
代码示例来源:origin: apache/flink
.groupBy("*").reduceGroup(new IdentityGroupReducer<Long>())
.withForwardedFields("*").setParallelism(p).name("Reduce1")
.map(new IdentityMapper<Long>())
.withForwardedFields("*").setParallelism(p).name("Map2")
.groupBy("*").reduceGroup(new IdentityGroupReducer<Long>())
.withForwardedFields("*").setParallelism(p * 2).name("Reduce2")
.output(new DiscardingOutputFormat<Long>()).setParallelism(p * 2).name("Sink");
代码示例来源:origin: apache/flink
.map(new IdentityMapper<Long>())
.withForwardedFields("*").setParallelism(p * 2).name("Map1")
.groupBy("*").reduceGroup(new IdentityGroupReducer<Long>())
.withForwardedFields("*").setParallelism(p * 2).name("Reduce1")
.map(new IdentityMapper<Long>())
.withForwardedFields("*").setParallelism(p).name("Map2")
.groupBy("*").reduceGroup(new IdentityGroupReducer<Long>())
.withForwardedFields("*").setParallelism(p).name("Reduce2")
.output(new DiscardingOutputFormat<Long>()).setParallelism(p).name("Sink");
代码示例来源:origin: apache/flink
.map(new IdentityMapper<Tuple3<Long,Long,Long>>()).withForwardedFields("0", "1", "2")
.groupBy(0, 1)
.reduceGroup(new IdentityGroupReducerCombinable<Tuple3<Long,Long,Long>>()).withForwardedFields("0", "1", "2")
代码示例来源:origin: apache/flink
.groupBy(0).reduce(new KMeans.CentroidAccumulator())
代码示例来源:origin: apache/flink
/**
* Source -> Map -> Reduce -> Cross -> Reduce -> Cross -> Reduce ->
* |--------------------------/ /
* |--------------------------------------------/
*
* First cross has SameKeyFirst output contract
*/
@Test
public void testTicket158() {
// construct the plan
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(DEFAULT_PARALLELISM);
DataSet<Long> set1 = env.generateSequence(0,1);
set1.map(new IdentityMapper<Long>()).name("Map1")
.groupBy("*").reduceGroup(new IdentityGroupReducer<Long>()).name("Reduce1")
.cross(set1).with(new IdentityCrosser<Long>()).withForwardedFieldsFirst("*").name("Cross1")
.groupBy("*").reduceGroup(new IdentityGroupReducer<Long>()).name("Reduce2")
.cross(set1).with(new IdentityCrosser<Long>()).name("Cross2")
.groupBy("*").reduceGroup(new IdentityGroupReducer<Long>()).name("Reduce3")
.output(new DiscardingOutputFormat<Long>()).name("Sink");
Plan plan = env.createProgramPlan();
OptimizedPlan oPlan = compileNoStats(plan);
JobGraphGenerator jobGen = new JobGraphGenerator();
jobGen.compileJobGraph(oPlan);
}
}
代码示例来源:origin: apache/flink
.groupBy(0).sum(1).name("Count Words")
内容来源于网络,如有侵权,请联系作者删除!