本文整理了Java中org.apache.flink.api.java.operators.MapOperator.map()
方法的一些代码示例,展示了MapOperator.map()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。MapOperator.map()
方法的具体详情如下:
包路径:org.apache.flink.api.java.operators.MapOperator
类名称:MapOperator
方法名:map
暂无
代码示例来源:origin: apache/flink
@Test
public void testMapWithLambdas() throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Integer> stringDs = env.fromElements(11, 12, 13, 14);
DataSet<String> mappedDs = stringDs
.map(Object::toString)
.map(s -> s.replace("1", "2"))
.map(Trade::new)
.map(Trade::toString);
List<String> result = mappedDs.collect();
String expected = "22\n" +
"22\n" +
"23\n" +
"24\n";
compareResultAsText(result, expected);
}
代码示例来源:origin: apache/flink
@Test
public void testBranchesOnlyInBCVariables1() {
try{
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(100);
DataSet<Long> input = env.generateSequence(1, 10);
DataSet<Long> bc_input = env.generateSequence(1, 10);
input
.map(new IdentityMapper<Long>()).withBroadcastSet(bc_input, "name1")
.map(new IdentityMapper<Long>()).withBroadcastSet(bc_input, "name2")
.output(new DiscardingOutputFormat<Long>());
Plan plan = env.createProgramPlan();
compileNoStats(plan);
}
catch(Exception e){
e.printStackTrace();
fail(e.getMessage());
}
}
代码示例来源:origin: apache/flink
.map(new IdentityMapper<Long>())
.withBroadcastSet(source, "bc");
代码示例来源:origin: apache/flink
.map(new CountAppender())
.groupBy(0).reduce(new CentroidAccumulator())
代码示例来源:origin: apache/flink
source1.map(new IdentityMapper<String>()).map(new IdentityMapper<String>()).withBroadcastSet(source1, "some name")
.output(new DiscardingOutputFormat<String>());
代码示例来源:origin: apache/flink
@Test
public void testInvalidTypeAccumulator() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().disableSysoutLogging();
// Test Exception forwarding with faulty Accumulator implementation
env.generateSequence(0, 10000)
.map(new IncompatibleAccumulatorTypesMapper())
.map(new IncompatibleAccumulatorTypesMapper2())
.output(new DiscardingOutputFormat<>());
try {
env.execute();
fail("Should have failed.");
} catch (JobExecutionException e) {
assertTrue("Root cause should be:",
e.getCause() instanceof Exception);
assertTrue("Root cause should be:",
e.getCause().getCause() instanceof UnsupportedOperationException);
}
}
代码示例来源:origin: apache/flink
.map(new KMeans.CountAppender())
.groupBy(0).reduce(new KMeans.CentroidAccumulator())
代码示例来源:origin: org.gradoop/gradoop-flink
@Override
public DataSet<Tuple3<String, String, String>> readDistributed(String path, GradoopFlinkConfig
config) {
return config.getExecutionEnvironment()
.readTextFile(path)
.map(line -> StringEscaper.split(line, CSVConstants.TOKEN_DELIMITER, 3))
.map(tokens -> Tuple3.of(tokens[0], tokens[1], tokens[2]))
.returns(new TypeHint<Tuple3<String, String, String>>() {
});
}
代码示例来源:origin: dbs-leipzig/gradoop
@Override
public DataSet<Tuple3<String, String, String>> readDistributed(String path, GradoopFlinkConfig
config) {
return config.getExecutionEnvironment()
.readTextFile(path)
.map(line -> StringEscaper.split(line, CSVConstants.TOKEN_DELIMITER, 3))
.map(tokens -> Tuple3.of(tokens[0], tokens[1], tokens[2]))
.returns(new TypeHint<Tuple3<String, String, String>>() {
});
}
代码示例来源:origin: dbs-leipzig/gradoop
@Override
public LogicalGraph execute(LogicalGraph graph) {
DistinctVertexDegrees distinctVertexDegrees = new DistinctVertexDegrees(
SamplingAlgorithm.DEGREE_PROPERTY_KEY,
SamplingAlgorithm.IN_DEGREE_PROPERTY_KEY,
SamplingAlgorithm.OUT_DEGREE_PROPERTY_KEY,
true);
DataSet<Vertex> newVertices = distinctVertexDegrees.execute(graph).getVertices()
.filter(new VertexWithDegreeFilter<>(degree, SamplingAlgorithm.DEGREE_PROPERTY_KEY))
.map(new PropertyRemover<>(SamplingAlgorithm.DEGREE_PROPERTY_KEY))
.map(new PropertyRemover<>(SamplingAlgorithm.IN_DEGREE_PROPERTY_KEY))
.map(new PropertyRemover<>(SamplingAlgorithm.OUT_DEGREE_PROPERTY_KEY));
return graph.getConfig().getLogicalGraphFactory().fromDataSets(
graph.getGraphHead(), newVertices, graph.getEdges());
}
}
代码示例来源:origin: org.gradoop/gradoop-flink
/**
* {@inheritDoc}
*/
@Override
public LogicalGraph execute(LogicalGraph graph) {
DistinctVertexDegrees distinctVertexDegrees = new DistinctVertexDegrees(
SamplingAlgorithm.DEGREE_PROPERTY_KEY,
SamplingAlgorithm.IN_DEGREE_PROPERTY_KEY,
SamplingAlgorithm.OUT_DEGREE_PROPERTY_KEY,
true);
DataSet<Vertex> newVertices = distinctVertexDegrees.execute(graph).getVertices()
.filter(new VertexWithDegreeFilter<>(degree, SamplingAlgorithm.DEGREE_PROPERTY_KEY))
.map(new PropertyRemover<>(SamplingAlgorithm.DEGREE_PROPERTY_KEY))
.map(new PropertyRemover<>(SamplingAlgorithm.IN_DEGREE_PROPERTY_KEY))
.map(new PropertyRemover<>(SamplingAlgorithm.OUT_DEGREE_PROPERTY_KEY));
return graph.getConfig().getLogicalGraphFactory().fromDataSets(
graph.getGraphHead(), newVertices, graph.getEdges());
}
代码示例来源:origin: org.gradoop/gradoop-flink
/**
* Returns the subgraph of the given supergraph that is induced by the
* edges that fulfil the given filter function.
*
* @param superGraph supergraph
* @return edge-induced subgraph
*/
private LG edgeInducedSubgraphProjectFirst(LG superGraph) {
DataSet<E> filteredEdges = superGraph.getEdges().filter(edgeFilterFunction);
DataSet<Tuple1<GradoopId>> vertexIdentifiers = filteredEdges
.map(new SourceId<>())
.map(new ObjectTo1<>())
.union(filteredEdges
.map(new TargetId<>())
.map(new ObjectTo1<>()))
.distinct();
DataSet<V> filteredVertices = vertexIdentifiers
.join(superGraph.getVertices())
.where(0).equalTo(new Id<>())
.with(new RightSide<>());
return superGraph.getFactory().fromDataSets(filteredVertices, filteredEdges);
}
代码示例来源:origin: dbs-leipzig/gradoop
/**
* Returns the subgraph of the given supergraph that is induced by the
* edges that fulfil the given filter function.
*
* @param superGraph supergraph
* @return edge-induced subgraph
*/
private LG edgeInducedSubgraphProjectFirst(LG superGraph) {
DataSet<E> filteredEdges = superGraph.getEdges().filter(edgeFilterFunction);
DataSet<Tuple1<GradoopId>> vertexIdentifiers = filteredEdges
.map(new SourceId<>())
.map(new ObjectTo1<>())
.union(filteredEdges
.map(new TargetId<>())
.map(new ObjectTo1<>()))
.distinct();
DataSet<V> filteredVertices = vertexIdentifiers
.join(superGraph.getVertices())
.where(0).equalTo(new Id<>())
.with(new RightSide<>());
return superGraph.getFactory().fromDataSets(filteredVertices, filteredEdges);
}
代码示例来源:origin: org.gradoop/gradoop-flink
@Override
protected GraphCollection executeForVertex(LogicalGraph graph) {
GradoopFlinkConfig config = graph.getConfig();
EPGMGraphHeadFactory<GraphHead> graphHeadFactory = config.getGraphHeadFactory();
EPGMVertexFactory<Vertex> vertexFactory = config.getVertexFactory();
String variable = getQueryHandler().getVertices().iterator().next().getVariable();
DataSet<Vertex> matchingVertices = graph.getVertices()
.filter(new MatchingVertices<>(getQuery()));
if (!doAttachData()) {
matchingVertices = matchingVertices
.map(new Id<>())
.map(new ObjectTo1<>())
.map(new VertexFromId(vertexFactory));
}
DataSet<Tuple2<Vertex, GraphHead>> pairs = matchingVertices
.map(new AddGraphElementToNewGraph<>(graphHeadFactory, variable))
.returns(new TupleTypeInfo<>(
TypeExtractor.getForClass(vertexFactory.getType()),
TypeExtractor.getForClass(graphHeadFactory.getType())));
return config.getGraphCollectionFactory().fromDataSets(
pairs.map(new Value1Of2<>()),
pairs.map(new Value0Of2<>()));
}
代码示例来源:origin: dbs-leipzig/gradoop
@Override
protected GraphCollection executeForVertex(LogicalGraph graph) {
GradoopFlinkConfig config = graph.getConfig();
EPGMGraphHeadFactory<GraphHead> graphHeadFactory = config.getGraphHeadFactory();
EPGMVertexFactory<Vertex> vertexFactory = config.getVertexFactory();
String variable = getQueryHandler().getVertices().iterator().next().getVariable();
DataSet<Vertex> matchingVertices = graph.getVertices()
.filter(new MatchingVertices<>(getQuery()));
if (!doAttachData()) {
matchingVertices = matchingVertices
.map(new Id<>())
.map(new ObjectTo1<>())
.map(new VertexFromId(vertexFactory));
}
DataSet<Tuple2<Vertex, GraphHead>> pairs = matchingVertices
.map(new AddGraphElementToNewGraph<>(graphHeadFactory, variable))
.returns(new TupleTypeInfo<>(
TypeExtractor.getForClass(vertexFactory.getType()),
TypeExtractor.getForClass(graphHeadFactory.getType())));
return config.getGraphCollectionFactory().fromDataSets(
pairs.map(new Value1Of2<>()),
pairs.map(new Value0Of2<>()));
}
代码示例来源:origin: org.gradoop/gradoop-flink
/**
* Verifies that the given graph is consistent, contains only edges that connect to vertices
* within the subgraph.
*
* @param subgraph supergraph
* @return verified subgraph
*/
private LG verify(LG subgraph) {
DataSet<Tuple2<Tuple2<E, V>, V>> verifiedTriples = subgraph.getEdges()
.join(subgraph.getVertices())
.where(new SourceId<>()).equalTo(new Id<>())
.join(subgraph.getVertices())
.where("0.targetId").equalTo(new Id<>());
DataSet<E> verifiedEdges = verifiedTriples
.map(new Value0Of2<>())
.map(new Value0Of2<>());
DataSet<V> verifiedVertices = verifiedTriples
.map(new Value0Of2<>())
.map(new Value1Of2<>())
.union(verifiedTriples.map(new Value1Of2<>()))
.distinct(new Id<>());
return subgraph.getFactory().fromDataSets(verifiedVertices, verifiedEdges);
}
代码示例来源:origin: dbs-leipzig/gradoop
/**
* Verifies that the given graph is consistent, contains only edges that connect to vertices
* within the subgraph.
*
* @param subgraph supergraph
* @return verified subgraph
*/
private LG verify(LG subgraph) {
DataSet<Tuple2<Tuple2<E, V>, V>> verifiedTriples = subgraph.getEdges()
.join(subgraph.getVertices())
.where(new SourceId<>()).equalTo(new Id<>())
.join(subgraph.getVertices())
.where("0.targetId").equalTo(new Id<>());
DataSet<E> verifiedEdges = verifiedTriples
.map(new Value0Of2<>())
.map(new Value0Of2<>());
DataSet<V> verifiedVertices = verifiedTriples
.map(new Value0Of2<>())
.map(new Value1Of2<>())
.union(verifiedTriples.map(new Value1Of2<>()))
.distinct(new Id<>());
return subgraph.getFactory().fromDataSets(verifiedVertices, verifiedEdges);
}
}
代码示例来源:origin: dbs-leipzig/gradoop
@Override
public LogicalGraph sample(LogicalGraph graph) {
graph = new DistinctVertexDegrees(
DEGREE_PROPERTY_KEY,
IN_DEGREE_PROPERTY_KEY,
OUT_DEGREE_PROPERTY_KEY,
true).execute(graph);
DataSet<Vertex> newVertices = graph.getVertices()
.filter(new LimitedDegreeVertexRandomFilter<>(
sampleSize, randomSeed, degreeThreshold, degreeType))
.map(new PropertyRemover<>(DEGREE_PROPERTY_KEY))
.map(new PropertyRemover<>(IN_DEGREE_PROPERTY_KEY))
.map(new PropertyRemover<>(OUT_DEGREE_PROPERTY_KEY));
DataSet<Edge> newEdges = graph.getEdges()
.join(newVertices)
.where(new SourceId<>()).equalTo(new Id<>())
.with(new LeftSide<>())
.join(newVertices)
.where(new TargetId<>()).equalTo(new Id<>())
.with(new LeftSide<>());
return graph.getConfig().getLogicalGraphFactory()
.fromDataSets(graph.getGraphHead(), newVertices, newEdges);
}
代码示例来源:origin: org.gradoop/gradoop-flink
/**
* {@inheritDoc}
*/
@Override
public LogicalGraph sample(LogicalGraph graph) {
graph = new DistinctVertexDegrees(
DEGREE_PROPERTY_KEY,
IN_DEGREE_PROPERTY_KEY,
OUT_DEGREE_PROPERTY_KEY,
true).execute(graph);
DataSet<Vertex> newVertices = graph.getVertices()
.filter(new LimitedDegreeVertexRandomFilter<>(
sampleSize, randomSeed, degreeThreshold, degreeType))
.map(new PropertyRemover<>(DEGREE_PROPERTY_KEY))
.map(new PropertyRemover<>(IN_DEGREE_PROPERTY_KEY))
.map(new PropertyRemover<>(OUT_DEGREE_PROPERTY_KEY));
DataSet<Edge> newEdges = graph.getEdges()
.join(newVertices)
.where(new SourceId<>()).equalTo(new Id<>())
.with(new LeftSide<>())
.join(newVertices)
.where(new TargetId<>()).equalTo(new Id<>())
.with(new LeftSide<>());
return graph.getConfig().getLogicalGraphFactory()
.fromDataSets(graph.getGraphHead(), newVertices, newEdges);
}
代码示例来源:origin: dbs-leipzig/gradoop
.map(new PropertyRemover<>(IN_DEGREE_PROPERTY_KEY))
.map(new PropertyRemover<>(OUT_DEGREE_PROPERTY_KEY))
.map(new PropertyRemover<>(PROPERTY_KEY_MAX_DEGREE));
内容来源于网络,如有侵权,请联系作者删除!