本文整理了Java中org.apache.flink.api.java.operators.MapOperator.name()
方法的一些代码示例,展示了MapOperator.name()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。MapOperator.name()
方法的具体详情如下:
包路径:org.apache.flink.api.java.operators.MapOperator
类名称:MapOperator
方法名:name
暂无
代码示例来源:origin: apache/flink
/**
* @return The IDs of the vertices as DataSet
*/
public DataSet<K> getVertexIds() {
return vertices.map(new ExtractVertexIDMapper<>()).name("Vertex IDs");
}
代码示例来源:origin: apache/flink
/**
* @return The IDs of the edges as DataSet
*/
public DataSet<Tuple2<K, K>> getEdgeIds() {
return edges.map(new ExtractEdgeIDsMapper<>()).name("Edge IDs");
}
代码示例来源:origin: apache/flink
/**
* Reverse the direction of the edges in the graph.
*
* @return a new graph with all edges reversed
* @throws UnsupportedOperationException
*/
public Graph<K, VV, EV> reverse() throws UnsupportedOperationException {
DataSet<Edge<K, EV>> reversedEdges = edges.map(new ReverseEdgesMap<>()).name("Reverse edges");
return new Graph<>(vertices, reversedEdges, this.context);
}
代码示例来源:origin: apache/flink
/**
* Creates a Graph from CSV input without vertex values or edge values.
* @param vertexKey the type of the vertex IDs
* @return a Graph where the vertex IDs are read from the edges input file.
*/
public <K> Graph<K, NullValue, NullValue> keyType(Class<K> vertexKey) {
if (edgeReader == null) {
throw new RuntimeException("The edge input file cannot be null!");
}
DataSet<Edge<K, NullValue>> edges = edgeReader
.types(vertexKey, vertexKey)
.name(GraphCsvReader.class.getName())
.map(new Tuple2ToEdgeMap<>())
.name("Type conversion");
return Graph.fromDataSet(edges, executionContext);
}
代码示例来源:origin: apache/flink
/**
* Creates a graph from a DataSet of Tuple2 objects for edges.
* Each Tuple2 will become one Edge, where the source ID will be the first field of the Tuple2
* and the target ID will be the second field of the Tuple2.
*
* <p>Edge value types and Vertex values types will be set to NullValue.
*
* @param edges a DataSet of Tuple2.
* @param context the flink execution environment.
* @return the newly created graph.
*/
public static <K> Graph<K, NullValue, NullValue> fromTuple2DataSet(DataSet<Tuple2<K, K>> edges,
ExecutionEnvironment context) {
DataSet<Edge<K, NullValue>> edgeDataSet = edges
.map(new Tuple2ToEdgeMap<>())
.name("To Edge");
return fromDataSet(edgeDataSet, context);
}
代码示例来源:origin: apache/flink
private <K extends Tuple> void createHashPartitionOperation(PythonOperationInfo info) {
DataSet<Tuple2<K, byte[]>> op1 = sets.getDataSet(info.parentID);
DataSet<byte[]> result = op1
.partitionByHash(info.keys.toArray(new String[info.keys.size()])).setParallelism(info.parallelism)
.map(new KeyDiscarder<K>()).setParallelism(info.parallelism).name("HashPartitionPostStep");
sets.add(info.setID, result);
}
代码示例来源:origin: apache/flink
@Override
public DataSet plan(Graph<K, VV, EV> graph) throws Exception {
DataSet<Edge<K, EV>> edges = graph.getEdges();
if (hasNullValueEdges(edges)) {
return edges
.map(new EdgeToTuple2Map<>())
.name("Edge to Tuple2")
.setParallelism(parallelism.getValue().intValue());
} else {
return edges;
}
}
代码示例来源:origin: apache/flink
private <K extends Tuple> void createDistinctOperation(PythonOperationInfo info) {
DataSet<Tuple2<K, byte[]>> op = sets.getDataSet(info.parentID);
DataSet<byte[]> result = op
.distinct(info.keys.toArray(new String[info.keys.size()])).setParallelism(info.parallelism).name("Distinct")
.map(new KeyDiscarder<K>()).setParallelism(info.parallelism).name("DistinctPostStep");
sets.add(info.setID, result);
}
代码示例来源:origin: apache/flink
@SuppressWarnings("unchecked")
private <T extends Tuple> void createCsvSource(ExecutionEnvironment env, PythonOperationInfo info) {
if (!(info.types instanceof TupleTypeInfo)) {
throw new RuntimeException("The output type of a csv source has to be a tuple. The derived type is " + info);
}
Path path = new Path(info.path);
String lineD = info.lineDelimiter;
String fieldD = info.fieldDelimiter;
TupleTypeInfo<T> types = (TupleTypeInfo<T>) info.types;
sets.add(info.setID, env.createInput(new TupleCsvInputFormat<>(path, lineD, fieldD, types), types).setParallelism(info.parallelism).name("CsvSource")
.map(new SerializerMap<T>()).setParallelism(info.parallelism).name("CsvSourcePostStep"));
}
代码示例来源:origin: apache/flink
/**
* Apply a function to the attribute of each vertex in the graph.
*
* @param mapper the map function to apply.
* @param returnType the explicit return type.
* @return a new graph
*/
public <NV> Graph<K, NV, EV> mapVertices(final MapFunction<Vertex<K, VV>, NV> mapper, TypeInformation<Vertex<K, NV>> returnType) {
DataSet<Vertex<K, NV>> mappedVertices = vertices.map(
new MapFunction<Vertex<K, VV>, Vertex<K, NV>>() {
private Vertex<K, NV> output = new Vertex<>();
public Vertex<K, NV> map(Vertex<K, VV> value) throws Exception {
output.f0 = value.f0;
output.f1 = mapper.map(value);
return output;
}
})
.returns(returnType)
.withForwardedFields("f0")
.name("Map vertices");
return new Graph<>(mappedVertices, this.edges, this.context);
}
代码示例来源:origin: apache/flink
private void createPrintSink(PythonOperationInfo info) {
DataSet<byte[]> parent = sets.getDataSet(info.parentID);
parent.map(new StringDeserializerMap()).setParallelism(info.parallelism).name("PrintSinkPreStep")
.output(new PrintingOutputFormat<String>(info.toError)).setParallelism(info.parallelism);
}
代码示例来源:origin: apache/flink
private void createCsvSink(PythonOperationInfo info) {
DataSet<byte[]> parent = sets.getDataSet(info.parentID);
parent.map(new StringTupleDeserializerMap()).setParallelism(info.parallelism).name("CsvSinkPreStep")
.writeAsCsv(info.path, info.lineDelimiter, info.fieldDelimiter, info.writeMode).setParallelism(info.parallelism).name("CsvSink");
}
代码示例来源:origin: apache/flink
private void createTextSource(ExecutionEnvironment env, PythonOperationInfo info) {
sets.add(info.setID, env.readTextFile(info.path).setParallelism(info.parallelism).name("TextSource")
.map(new SerializerMap<String>()).setParallelism(info.parallelism).name("TextSourcePostStep"));
}
代码示例来源:origin: apache/flink
private void createSequenceSource(ExecutionEnvironment env, PythonOperationInfo info) {
sets.add(info.setID, env.generateSequence(info.frm, info.to).setParallelism(info.parallelism).name("SequenceSource")
.map(new SerializerMap<Long>()).setParallelism(info.parallelism).name("SequenceSourcePostStep"));
}
代码示例来源:origin: apache/flink
private void createValueSource(ExecutionEnvironment env, PythonOperationInfo info) {
sets.add(info.setID, env.fromCollection(info.values).setParallelism(info.parallelism).name("ValueSource")
.map(new SerializerMap<>()).setParallelism(info.parallelism).name("ValueSourcePostStep"));
}
代码示例来源:origin: apache/flink
/**
* Count the number of elements in a DataSet.
*
* @param input DataSet of elements to be counted
* @param <T> element type
* @return count
*/
public static <T> DataSet<LongValue> count(DataSet<T> input) {
return input
.map(new MapTo<>(new LongValue(1)))
.returns(LONG_VALUE_TYPE_INFO)
.name("Emit 1")
.reduce(new AddLongValue())
.name("Sum");
}
代码示例来源:origin: apache/flink
@Override
public Graph<LongValue, NullValue, NullValue> generate() {
Preconditions.checkState(vertexPairCount > 0);
// Vertices
long vertexCount = 2 * vertexPairCount;
DataSet<Vertex<LongValue, NullValue>> vertices = GraphGeneratorUtils.vertexSequence(env, parallelism, vertexCount);
// Edges
LongValueSequenceIterator iterator = new LongValueSequenceIterator(0, vertexCount - 1);
DataSet<Edge<LongValue, NullValue>> edges = env
.fromParallelCollection(iterator, LongValue.class)
.setParallelism(parallelism)
.name("Edge iterators")
.map(new LinkVertexToSingletonNeighbor())
.setParallelism(parallelism)
.name("Complete graph edges");
// Graph
return Graph.fromDataSet(vertices, edges, env);
}
代码示例来源:origin: apache/flink
private <K extends Tuple> void createFirstOperation(PythonOperationInfo info) {
if (sets.isDataSet(info.parentID)) {
DataSet<byte[]> op = sets.getDataSet(info.parentID);
sets.add(info.setID, op
.first(info.count).setParallelism(info.parallelism).name("First"));
} else if (sets.isUnsortedGrouping(info.parentID)) {
UnsortedGrouping<Tuple2<K, byte[]>> op = sets.getUnsortedGrouping(info.parentID);
sets.add(info.setID, op
.first(info.count).setParallelism(info.parallelism).name("First")
.map(new KeyDiscarder<K>()).setParallelism(info.parallelism).name("FirstPostStep"));
} else if (sets.isSortedGrouping(info.parentID)) {
SortedGrouping<Tuple2<K, byte[]>> op = sets.getSortedGrouping(info.parentID);
sets.add(info.setID, op
.first(info.count).setParallelism(info.parallelism).name("First")
.map(new KeyDiscarder<K>()).setParallelism(info.parallelism).name("FirstPostStep"));
}
}
代码示例来源:origin: apache/flink
@Test
public void testBranchBeforeIteration() {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(DEFAULT_PARALLELISM);
DataSet<Long> source1 = env.generateSequence(0,1);
DataSet<Long> source2 = env.generateSequence(0,1);
IterativeDataSet<Long> loopHead = source2.iterate(10).name("Loop");
DataSet<Long> loopTail = source1.map(new IdentityMapper<Long>()).withBroadcastSet(loopHead, "BC").name("In-Loop Mapper");
DataSet<Long> loopRes = loopHead.closeWith(loopTail);
DataSet<Long> map = source1.map(new IdentityMapper<Long>()).withBroadcastSet(loopRes, "BC").name("Post-Loop Mapper");
map.output(new DiscardingOutputFormat<Long>());
Plan plan = env.createProgramPlan();
try {
compileNoStats(plan);
}
catch (Exception e) {
e.printStackTrace();
Assert.fail(e.getMessage());
}
}
代码示例来源:origin: apache/flink
@Test
public void testBranchAfterIteration() {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(DEFAULT_PARALLELISM);
DataSet<Long> sourceA = env.generateSequence(0,1);
IterativeDataSet<Long> loopHead = sourceA.iterate(10);
DataSet<Long> loopTail = loopHead.map(new IdentityMapper<Long>()).name("Mapper");
DataSet<Long> loopRes = loopHead.closeWith(loopTail);
loopRes.output(new DiscardingOutputFormat<Long>());
loopRes.map(new IdentityMapper<Long>())
.output(new DiscardingOutputFormat<Long>());
Plan plan = env.createProgramPlan();
try {
compileNoStats(plan);
}
catch (Exception e) {
e.printStackTrace();
Assert.fail(e.getMessage());
}
}
内容来源于网络,如有侵权,请联系作者删除!