本文整理了Java中org.apache.flink.api.java.operators.MapOperator
类的一些代码示例,展示了MapOperator
类的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。MapOperator
类的具体详情如下:
包路径:org.apache.flink.api.java.operators.MapOperator
类名称:MapOperator
[英]This operator represents the application of a "map" function on a data set, and the result data set produced by the function.
[中]此运算符表示“映射”函数在数据集上的应用,以及该函数生成的结果数据集。
代码示例来源:origin: apache/flink
.returns(returnType)
.setParallelism(parallelism)
.name("Translate edge IDs");
代码示例来源:origin: apache/flink
.map(new SelectNearestCenter()).withBroadcastSet(loop, "centroids")
.map(new CountAppender())
.groupBy(0).reduce(new CentroidAccumulator())
.map(new SelectNearestCenter()).withBroadcastSet(finalCentroids, "centroids");
代码示例来源:origin: apache/flink
/**
* Count the number of elements in a DataSet.
*
* @param input DataSet of elements to be counted
* @param <T> element type
* @return count
*/
public static <T> DataSet<LongValue> count(DataSet<T> input) {
return input
.map(new MapTo<>(new LongValue(1)))
.returns(LONG_VALUE_TYPE_INFO)
.name("Emit 1")
.reduce(new AddLongValue())
.name("Sum");
}
代码示例来源:origin: apache/flink
/**
* Apply a function to the attribute of each vertex in the graph.
*
* @param mapper the map function to apply.
* @param returnType the explicit return type.
* @return a new graph
*/
public <NV> Graph<K, NV, EV> mapVertices(final MapFunction<Vertex<K, VV>, NV> mapper, TypeInformation<Vertex<K, NV>> returnType) {
DataSet<Vertex<K, NV>> mappedVertices = vertices.map(
new MapFunction<Vertex<K, VV>, Vertex<K, NV>>() {
private Vertex<K, NV> output = new Vertex<>();
public Vertex<K, NV> map(Vertex<K, VV> value) throws Exception {
output.f0 = value.f0;
output.f1 = mapper.map(value);
return output;
}
})
.returns(returnType)
.withForwardedFields("f0")
.name("Map vertices");
return new Graph<>(mappedVertices, this.edges, this.context);
}
代码示例来源:origin: apache/flink
private void createPrintSink(PythonOperationInfo info) {
DataSet<byte[]> parent = sets.getDataSet(info.parentID);
parent.map(new StringDeserializerMap()).setParallelism(info.parallelism).name("PrintSinkPreStep")
.output(new PrintingOutputFormat<String>(info.toError)).setParallelism(info.parallelism);
}
代码示例来源:origin: apache/flink
private void createCsvSink(PythonOperationInfo info) {
DataSet<byte[]> parent = sets.getDataSet(info.parentID);
parent.map(new StringTupleDeserializerMap()).setParallelism(info.parallelism).name("CsvSinkPreStep")
.writeAsCsv(info.path, info.lineDelimiter, info.fieldDelimiter, info.writeMode).setParallelism(info.parallelism).name("CsvSink");
}
代码示例来源:origin: apache/flink
.withForwardedFields("*").setParallelism(p).name("Map1")
.groupBy("*").reduceGroup(new IdentityGroupReducer<Long>())
.withForwardedFields("*").setParallelism(p).name("Reduce1")
.map(new IdentityMapper<Long>())
.withForwardedFields("*").setParallelism(p * 2).name("Map2")
.groupBy("*").reduceGroup(new IdentityGroupReducer<Long>())
.withForwardedFields("*").setParallelism(p * 2).name("Reduce2")
.output(new DiscardingOutputFormat<Long>()).setParallelism(p * 2).name("Sink");
代码示例来源:origin: apache/flink
@Override
public DataSet plan(Graph<K, VV, EV> graph) throws Exception {
DataSet<Edge<K, EV>> edges = graph.getEdges();
if (hasNullValueEdges(edges)) {
return edges
.map(new EdgeToTuple2Map<>())
.name("Edge to Tuple2")
.setParallelism(parallelism.getValue().intValue());
} else {
return edges;
}
}
代码示例来源:origin: apache/flink
@Override
protected void testProgram() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
DataSet<Long> input = env.generateSequence(1, 10);
DataSet<Long> bc1 = env.generateSequence(1, 5);
DataSet<Long> bc2 = env.generateSequence(6, 10);
List<Long> result = input
.map(new Mapper())
.withBroadcastSet(bc1.union(bc2), BC_NAME)
.reduce(new Reducer())
.collect();
Assert.assertEquals(Long.valueOf(3025), result.get(0));
}
代码示例来源:origin: apache/flink
private static DataSet<String> analyze(DataSet<String> input, DataSet<String> stats, int branches) {
for (int i = 0; i < branches; i++) {
final int ii = i;
if (stats != null) {
input = input.map(
new RichMapFunction<String, String>() {
@Override
public String map(String value) {
return value;
}
}).withBroadcastSet(stats.map(s -> "(" + s + ").map"), "stats");
}
DataSet<String> branch = input
.map(s -> new Tuple2<>(0, s + ii)).returns(Types.TUPLE(Types.STRING, Types.INT))
.groupBy(0)
.minBy(1)
.map(kv -> kv.f1).returns(Types.STRING);
if (stats == null) {
stats = branch;
} else {
stats = stats.union(branch);
}
}
return stats.map(s -> "(" + s + ").stats");
}
}
代码示例来源:origin: apache/flink
.getEdges()
.map(new ExtractEdgeIDs<>())
.setParallelism(parallelism)
.name("Extract edge IDs");
.setParallelism(parallelism)
.name("Initial scores")
.groupBy(0)
.reduce(new SumScores<>())
.setCombineHint(CombineHint.HASH)
.setParallelism(parallelism)
.name("Square")
.reduce(new Sum())
.setParallelism(parallelism)
.name("Sum");
.setParallelism(parallelism)
.name("Square")
.reduce(new Sum())
.setParallelism(parallelism)
.name("Sum");
.closeWith(passThrough)
.map(new TranslateResult<>())
.setParallelism(parallelism)
.name("Map result");
代码示例来源:origin: apache/flink
@Test
public void testBranchesOnlyInBCVariables1() {
try{
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(100);
DataSet<Long> input = env.generateSequence(1, 10);
DataSet<Long> bc_input = env.generateSequence(1, 10);
input
.map(new IdentityMapper<Long>()).withBroadcastSet(bc_input, "name1")
.map(new IdentityMapper<Long>()).withBroadcastSet(bc_input, "name2")
.output(new DiscardingOutputFormat<Long>());
Plan plan = env.createProgramPlan();
compileNoStats(plan);
}
catch(Exception e){
e.printStackTrace();
fail(e.getMessage());
}
}
代码示例来源:origin: org.gradoop/gradoop-flink
.map(label -> Tuple2.of(label, env.readTextFile(getGraphHeadCSVPath(label))
.map(new CSVLineToGraphHead(graphHeadFactory))
.withBroadcastSet(metaDataBroadcast, BC_METADATA)
.filter(graphHead -> graphHead.getLabel().equals(label))))
.collect(Collectors.toMap(t -> t.f0, t -> t.f1));
.map(label -> Tuple2.of(label, env.readTextFile(getVertexCSVPath(label))
.map(new CSVLineToVertex(vertexFactory))
.withBroadcastSet(metaDataBroadcast, BC_METADATA)
.filter(vertex -> vertex.getLabel().equals(label))))
.collect(Collectors.toMap(t -> t.f0, t -> t.f1));
.map(label -> Tuple2.of(label, env.readTextFile(getEdgeCSVPath(label))
.map(new CSVLineToEdge(edgeFactory))
.withBroadcastSet(metaDataBroadcast, BC_METADATA)
.filter(edge -> edge.getLabel().equals(label))))
.collect(Collectors.toMap(t -> t.f0, t -> t.f1));
代码示例来源:origin: apache/flink
case IN:
return edges.map(new ProjectVertexWithEdgeValueMap<>(1))
.withForwardedFields("f1->f0")
.name("Vertex with in-edges")
.groupBy(0).reduce(new ApplyReduceFunction<>(reduceEdgesFunction))
.name("Reduce on edges");
case OUT:
return edges.map(new ProjectVertexWithEdgeValueMap<>(0))
.withForwardedFields("f0->f0")
.name("Vertex with out-edges")
.groupBy(0).reduce(new ApplyReduceFunction<>(reduceEdgesFunction))
.name("Reduce on edges");
case ALL:
代码示例来源:origin: apache/flink
@Override
public EdgeMetrics<K, VV, EV> run(Graph<K, VV, EV> input)
throws Exception {
super.run(input);
// s, t, (d(s), d(t))
DataSet<Edge<K, Tuple3<EV, LongValue, LongValue>>> edgeDegreePair = input
.run(new EdgeDegreePair<K, VV, EV>()
.setReduceOnTargetId(reduceOnTargetId)
.setParallelism(parallelism));
// s, d(s), count of (u, v) where deg(u) < deg(v) or (deg(u) == deg(v) and u < v)
DataSet<Tuple3<K, LongValue, LongValue>> edgeStats = edgeDegreePair
.map(new EdgeStats<>())
.setParallelism(parallelism)
.name("Edge stats")
.groupBy(0)
.reduce(new SumEdgeStats<>())
.setCombineHint(CombineHint.HASH)
.setParallelism(parallelism)
.name("Sum edge stats");
edgeMetricsHelper = new EdgeMetricsHelper<>();
edgeStats
.output(edgeMetricsHelper)
.setParallelism(parallelism)
.name("Edge metrics");
return this;
}
代码示例来源:origin: apache/flink
@Test
public void testBranchBeforeIteration() {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(DEFAULT_PARALLELISM);
DataSet<Long> source1 = env.generateSequence(0,1);
DataSet<Long> source2 = env.generateSequence(0,1);
IterativeDataSet<Long> loopHead = source2.iterate(10).name("Loop");
DataSet<Long> loopTail = source1.map(new IdentityMapper<Long>()).withBroadcastSet(loopHead, "BC").name("In-Loop Mapper");
DataSet<Long> loopRes = loopHead.closeWith(loopTail);
DataSet<Long> map = source1.map(new IdentityMapper<Long>()).withBroadcastSet(loopRes, "BC").name("Post-Loop Mapper");
map.output(new DiscardingOutputFormat<Long>());
Plan plan = env.createProgramPlan();
try {
compileNoStats(plan);
}
catch (Exception e) {
e.printStackTrace();
Assert.fail(e.getMessage());
}
}
代码示例来源:origin: dbs-leipzig/gradoop
@Override
public DataSet<Tuple3<String, String, String>> readDistributed(String path, GradoopFlinkConfig
config) {
return config.getExecutionEnvironment()
.readTextFile(path)
.map(line -> StringEscaper.split(line, CSVConstants.TOKEN_DELIMITER, 3))
.map(tokens -> Tuple3.of(tokens[0], tokens[1], tokens[2]))
.returns(new TypeHint<Tuple3<String, String, String>>() {
});
}
代码示例来源:origin: apache/flink
.map(new KMeans.SelectNearestCenter()).withBroadcastSet(loop, "centroids")
.rebalance()
.groupBy(0).reduce(new KMeans.CentroidAccumulator())
.map(new KMeans.SelectNearestCenter()).withBroadcastSet(finalCentroids, "centroids");
代码示例来源:origin: apache/flink
@Test
public void testUnaryOp() {
try {
ExecutionEnvironment env = ExecutionEnvironment.createCollectionsEnvironment();
DataSet<String> bcData = env.fromElements(SUFFIX);
List<String> result = new ArrayList<String>();
env.fromElements(TEST_DATA)
.map(new SuffixAppender()).withBroadcastSet(bcData, BC_VAR_NAME)
.output(new LocalCollectionOutputFormat<String>(result));
env.execute();
assertEquals(TEST_DATA.length, result.size());
for (String s : result) {
assertTrue(s.indexOf(SUFFIX) > 0);
}
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
代码示例来源:origin: apache/flink
DataSet<Tuple3<Integer, Integer, Integer>> in1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
DataSet<Tuple3<Integer, Integer, Integer>> in2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
in1 = in1.map(new MockMapper()).withForwardedFields("*")
.groupBy(0)
.reduce(new MockReducer()).withForwardedFields("f0->f1");
in2 = in2.map(new MockMapper()).withForwardedFields("*")
.groupBy(1)
.reduce(new MockReducer()).withForwardedFields("f1->f2");
DataSet<Tuple3<Integer, Integer, Integer>> out = in1.join(in2).where(1).equalTo(2).with(new MockJoin());
内容来源于网络,如有侵权,请联系作者删除!