org.apache.flink.api.java.operators.MapOperator.name()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(10.4k)|赞(0)|评价(0)|浏览(109)

本文整理了Java中org.apache.flink.api.java.operators.MapOperator.name()方法的一些代码示例,展示了MapOperator.name()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。MapOperator.name()方法的具体详情如下:
包路径:org.apache.flink.api.java.operators.MapOperator
类名称:MapOperator
方法名:name

MapOperator.name介绍

暂无

代码示例

代码示例来源:origin: apache/flink

/**
 * @return The IDs of the vertices as DataSet
 */
public DataSet<K> getVertexIds() {
  return vertices.map(new ExtractVertexIDMapper<>()).name("Vertex IDs");
}

代码示例来源:origin: apache/flink

/**
 * @return The IDs of the edges as DataSet
 */
public DataSet<Tuple2<K, K>> getEdgeIds() {
  return edges.map(new ExtractEdgeIDsMapper<>()).name("Edge IDs");
}

代码示例来源:origin: apache/flink

/**
 * Reverse the direction of the edges in the graph.
 *
 * @return a new graph with all edges reversed
 * @throws UnsupportedOperationException
 */
public Graph<K, VV, EV> reverse() throws UnsupportedOperationException {
  DataSet<Edge<K, EV>> reversedEdges = edges.map(new ReverseEdgesMap<>()).name("Reverse edges");
  return new Graph<>(vertices, reversedEdges, this.context);
}

代码示例来源:origin: apache/flink

/**
 * Creates a Graph from CSV input without vertex values or edge values.
 * @param vertexKey the type of the vertex IDs
 * @return a Graph where the vertex IDs are read from the edges input file.
 */
public <K> Graph<K, NullValue, NullValue> keyType(Class<K> vertexKey) {
  if (edgeReader == null) {
    throw new RuntimeException("The edge input file cannot be null!");
  }
  DataSet<Edge<K, NullValue>> edges = edgeReader
    .types(vertexKey, vertexKey)
      .name(GraphCsvReader.class.getName())
    .map(new Tuple2ToEdgeMap<>())
      .name("Type conversion");
  return Graph.fromDataSet(edges, executionContext);
}

代码示例来源:origin: apache/flink

/**
 * Creates a graph from a DataSet of Tuple2 objects for edges.
 * Each Tuple2 will become one Edge, where the source ID will be the first field of the Tuple2
 * and the target ID will be the second field of the Tuple2.
 *
 * <p>Edge value types and Vertex values types will be set to NullValue.
 *
 * @param edges a DataSet of Tuple2.
 * @param context the flink execution environment.
 * @return the newly created graph.
 */
public static <K> Graph<K, NullValue, NullValue> fromTuple2DataSet(DataSet<Tuple2<K, K>> edges,
    ExecutionEnvironment context) {
  DataSet<Edge<K, NullValue>> edgeDataSet = edges
    .map(new Tuple2ToEdgeMap<>())
      .name("To Edge");
  return fromDataSet(edgeDataSet, context);
}

代码示例来源:origin: apache/flink

private <K extends Tuple> void createHashPartitionOperation(PythonOperationInfo info) {
  DataSet<Tuple2<K, byte[]>> op1 = sets.getDataSet(info.parentID);
  DataSet<byte[]> result = op1
    .partitionByHash(info.keys.toArray(new String[info.keys.size()])).setParallelism(info.parallelism)
    .map(new KeyDiscarder<K>()).setParallelism(info.parallelism).name("HashPartitionPostStep");
  sets.add(info.setID, result);
}

代码示例来源:origin: apache/flink

@Override
public DataSet plan(Graph<K, VV, EV> graph) throws Exception {
  DataSet<Edge<K, EV>> edges = graph.getEdges();
  if (hasNullValueEdges(edges)) {
    return edges
      .map(new EdgeToTuple2Map<>())
      .name("Edge to Tuple2")
      .setParallelism(parallelism.getValue().intValue());
  } else {
    return edges;
  }
}

代码示例来源:origin: apache/flink

private <K extends Tuple> void createDistinctOperation(PythonOperationInfo info) {
  DataSet<Tuple2<K, byte[]>> op = sets.getDataSet(info.parentID);
  DataSet<byte[]> result = op
    .distinct(info.keys.toArray(new String[info.keys.size()])).setParallelism(info.parallelism).name("Distinct")
    .map(new KeyDiscarder<K>()).setParallelism(info.parallelism).name("DistinctPostStep");
  sets.add(info.setID, result);
}

代码示例来源:origin: apache/flink

@SuppressWarnings("unchecked")
private <T extends Tuple> void createCsvSource(ExecutionEnvironment env, PythonOperationInfo info) {
  if (!(info.types instanceof TupleTypeInfo)) {
    throw new RuntimeException("The output type of a csv source has to be a tuple. The derived type is " + info);
  }
  Path path = new Path(info.path);
  String lineD = info.lineDelimiter;
  String fieldD = info.fieldDelimiter;
  TupleTypeInfo<T> types = (TupleTypeInfo<T>) info.types;
  sets.add(info.setID, env.createInput(new TupleCsvInputFormat<>(path, lineD, fieldD, types), types).setParallelism(info.parallelism).name("CsvSource")
    .map(new SerializerMap<T>()).setParallelism(info.parallelism).name("CsvSourcePostStep"));
}

代码示例来源:origin: apache/flink

/**
 * Apply a function to the attribute of each vertex in the graph.
 *
 * @param mapper the map function to apply.
 * @param returnType the explicit return type.
 * @return a new graph
 */
public <NV> Graph<K, NV, EV> mapVertices(final MapFunction<Vertex<K, VV>, NV> mapper, TypeInformation<Vertex<K, NV>> returnType) {
  DataSet<Vertex<K, NV>> mappedVertices = vertices.map(
      new MapFunction<Vertex<K, VV>, Vertex<K, NV>>() {
        private Vertex<K, NV> output = new Vertex<>();
        public Vertex<K, NV> map(Vertex<K, VV> value) throws Exception {
          output.f0 = value.f0;
          output.f1 = mapper.map(value);
          return output;
        }
      })
      .returns(returnType)
      .withForwardedFields("f0")
        .name("Map vertices");
  return new Graph<>(mappedVertices, this.edges, this.context);
}

代码示例来源:origin: apache/flink

private void createPrintSink(PythonOperationInfo info) {
  DataSet<byte[]> parent = sets.getDataSet(info.parentID);
  parent.map(new StringDeserializerMap()).setParallelism(info.parallelism).name("PrintSinkPreStep")
    .output(new PrintingOutputFormat<String>(info.toError)).setParallelism(info.parallelism);
}

代码示例来源:origin: apache/flink

private void createCsvSink(PythonOperationInfo info) {
  DataSet<byte[]> parent = sets.getDataSet(info.parentID);
  parent.map(new StringTupleDeserializerMap()).setParallelism(info.parallelism).name("CsvSinkPreStep")
      .writeAsCsv(info.path, info.lineDelimiter, info.fieldDelimiter, info.writeMode).setParallelism(info.parallelism).name("CsvSink");
}

代码示例来源:origin: apache/flink

private void createTextSource(ExecutionEnvironment env, PythonOperationInfo info) {
  sets.add(info.setID, env.readTextFile(info.path).setParallelism(info.parallelism).name("TextSource")
    .map(new SerializerMap<String>()).setParallelism(info.parallelism).name("TextSourcePostStep"));
}

代码示例来源:origin: apache/flink

private void createSequenceSource(ExecutionEnvironment env, PythonOperationInfo info) {
  sets.add(info.setID, env.generateSequence(info.frm, info.to).setParallelism(info.parallelism).name("SequenceSource")
    .map(new SerializerMap<Long>()).setParallelism(info.parallelism).name("SequenceSourcePostStep"));
}

代码示例来源:origin: apache/flink

private void createValueSource(ExecutionEnvironment env, PythonOperationInfo info) {
  sets.add(info.setID, env.fromCollection(info.values).setParallelism(info.parallelism).name("ValueSource")
    .map(new SerializerMap<>()).setParallelism(info.parallelism).name("ValueSourcePostStep"));
}

代码示例来源:origin: apache/flink

/**
 * Count the number of elements in a DataSet.
 *
 * @param input DataSet of elements to be counted
 * @param <T> element type
 * @return count
 */
public static <T> DataSet<LongValue> count(DataSet<T> input) {
  return input
    .map(new MapTo<>(new LongValue(1)))
      .returns(LONG_VALUE_TYPE_INFO)
      .name("Emit 1")
    .reduce(new AddLongValue())
      .name("Sum");
}

代码示例来源:origin: apache/flink

@Override
public Graph<LongValue, NullValue, NullValue> generate() {
  Preconditions.checkState(vertexPairCount > 0);
  // Vertices
  long vertexCount = 2 * vertexPairCount;
  DataSet<Vertex<LongValue, NullValue>> vertices = GraphGeneratorUtils.vertexSequence(env, parallelism, vertexCount);
  // Edges
  LongValueSequenceIterator iterator = new LongValueSequenceIterator(0, vertexCount - 1);
  DataSet<Edge<LongValue, NullValue>> edges = env
    .fromParallelCollection(iterator, LongValue.class)
      .setParallelism(parallelism)
      .name("Edge iterators")
    .map(new LinkVertexToSingletonNeighbor())
      .setParallelism(parallelism)
      .name("Complete graph edges");
  // Graph
  return Graph.fromDataSet(vertices, edges, env);
}

代码示例来源:origin: apache/flink

private <K extends Tuple> void createFirstOperation(PythonOperationInfo info) {
  if (sets.isDataSet(info.parentID)) {
    DataSet<byte[]> op = sets.getDataSet(info.parentID);
    sets.add(info.setID, op
      .first(info.count).setParallelism(info.parallelism).name("First"));
  } else if (sets.isUnsortedGrouping(info.parentID)) {
    UnsortedGrouping<Tuple2<K, byte[]>> op = sets.getUnsortedGrouping(info.parentID);
    sets.add(info.setID, op
      .first(info.count).setParallelism(info.parallelism).name("First")
      .map(new KeyDiscarder<K>()).setParallelism(info.parallelism).name("FirstPostStep"));
  } else if (sets.isSortedGrouping(info.parentID)) {
    SortedGrouping<Tuple2<K, byte[]>> op = sets.getSortedGrouping(info.parentID);
    sets.add(info.setID, op
      .first(info.count).setParallelism(info.parallelism).name("First")
      .map(new KeyDiscarder<K>()).setParallelism(info.parallelism).name("FirstPostStep"));
  }
}

代码示例来源:origin: apache/flink

@Test
public void testBranchBeforeIteration() {
  ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  env.setParallelism(DEFAULT_PARALLELISM);
  DataSet<Long> source1 = env.generateSequence(0,1);
  DataSet<Long> source2 = env.generateSequence(0,1);
  IterativeDataSet<Long> loopHead = source2.iterate(10).name("Loop");
  DataSet<Long> loopTail = source1.map(new IdentityMapper<Long>()).withBroadcastSet(loopHead, "BC").name("In-Loop Mapper");
  DataSet<Long> loopRes = loopHead.closeWith(loopTail);
  DataSet<Long> map = source1.map(new IdentityMapper<Long>()).withBroadcastSet(loopRes, "BC").name("Post-Loop Mapper");
  map.output(new DiscardingOutputFormat<Long>());
  Plan plan = env.createProgramPlan();
  try {
    compileNoStats(plan);
  }
  catch (Exception e) {
    e.printStackTrace();
    Assert.fail(e.getMessage());
  }
}

代码示例来源:origin: apache/flink

@Test
public void testBranchAfterIteration() {
  ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  env.setParallelism(DEFAULT_PARALLELISM);
  DataSet<Long> sourceA = env.generateSequence(0,1);
  IterativeDataSet<Long> loopHead = sourceA.iterate(10);
  DataSet<Long> loopTail = loopHead.map(new IdentityMapper<Long>()).name("Mapper");
  DataSet<Long> loopRes = loopHead.closeWith(loopTail);
  loopRes.output(new DiscardingOutputFormat<Long>());
  loopRes.map(new IdentityMapper<Long>())
      .output(new DiscardingOutputFormat<Long>());
  Plan plan = env.createProgramPlan();
  try {
    compileNoStats(plan);
  }
  catch (Exception e) {
    e.printStackTrace();
    Assert.fail(e.getMessage());
  }
}

相关文章