org.apache.flink.api.java.operators.MapOperator类的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(11.1k)|赞(0)|评价(0)|浏览(120)

本文整理了Java中org.apache.flink.api.java.operators.MapOperator类的一些代码示例,展示了MapOperator类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。MapOperator类的具体详情如下:
包路径:org.apache.flink.api.java.operators.MapOperator
类名称:MapOperator

MapOperator介绍

[英]This operator represents the application of a "map" function on a data set, and the result data set produced by the function.
[中]此运算符表示“映射”函数在数据集上的应用,以及该函数生成的结果数据集。

代码示例

代码示例来源:origin: apache/flink

.returns(returnType)
  .setParallelism(parallelism)
  .name("Translate edge IDs");

代码示例来源:origin: apache/flink

.map(new SelectNearestCenter()).withBroadcastSet(loop, "centroids")
.map(new CountAppender())
.groupBy(0).reduce(new CentroidAccumulator())
.map(new SelectNearestCenter()).withBroadcastSet(finalCentroids, "centroids");

代码示例来源:origin: apache/flink

/**
 * Count the number of elements in a DataSet.
 *
 * @param input DataSet of elements to be counted
 * @param <T> element type
 * @return count
 */
public static <T> DataSet<LongValue> count(DataSet<T> input) {
  return input
    .map(new MapTo<>(new LongValue(1)))
      .returns(LONG_VALUE_TYPE_INFO)
      .name("Emit 1")
    .reduce(new AddLongValue())
      .name("Sum");
}

代码示例来源:origin: apache/flink

/**
 * Apply a function to the attribute of each vertex in the graph.
 *
 * @param mapper the map function to apply.
 * @param returnType the explicit return type.
 * @return a new graph
 */
public <NV> Graph<K, NV, EV> mapVertices(final MapFunction<Vertex<K, VV>, NV> mapper, TypeInformation<Vertex<K, NV>> returnType) {
  DataSet<Vertex<K, NV>> mappedVertices = vertices.map(
      new MapFunction<Vertex<K, VV>, Vertex<K, NV>>() {
        private Vertex<K, NV> output = new Vertex<>();
        public Vertex<K, NV> map(Vertex<K, VV> value) throws Exception {
          output.f0 = value.f0;
          output.f1 = mapper.map(value);
          return output;
        }
      })
      .returns(returnType)
      .withForwardedFields("f0")
        .name("Map vertices");
  return new Graph<>(mappedVertices, this.edges, this.context);
}

代码示例来源:origin: apache/flink

private void createPrintSink(PythonOperationInfo info) {
  DataSet<byte[]> parent = sets.getDataSet(info.parentID);
  parent.map(new StringDeserializerMap()).setParallelism(info.parallelism).name("PrintSinkPreStep")
    .output(new PrintingOutputFormat<String>(info.toError)).setParallelism(info.parallelism);
}

代码示例来源:origin: apache/flink

private void createCsvSink(PythonOperationInfo info) {
  DataSet<byte[]> parent = sets.getDataSet(info.parentID);
  parent.map(new StringTupleDeserializerMap()).setParallelism(info.parallelism).name("CsvSinkPreStep")
      .writeAsCsv(info.path, info.lineDelimiter, info.fieldDelimiter, info.writeMode).setParallelism(info.parallelism).name("CsvSink");
}

代码示例来源:origin: apache/flink

.withForwardedFields("*").setParallelism(p).name("Map1")
.groupBy("*").reduceGroup(new IdentityGroupReducer<Long>())
  .withForwardedFields("*").setParallelism(p).name("Reduce1")
.map(new IdentityMapper<Long>())
  .withForwardedFields("*").setParallelism(p * 2).name("Map2")
.groupBy("*").reduceGroup(new IdentityGroupReducer<Long>())
  .withForwardedFields("*").setParallelism(p * 2).name("Reduce2")
.output(new DiscardingOutputFormat<Long>()).setParallelism(p * 2).name("Sink");

代码示例来源:origin: apache/flink

@Override
public DataSet plan(Graph<K, VV, EV> graph) throws Exception {
  DataSet<Edge<K, EV>> edges = graph.getEdges();
  if (hasNullValueEdges(edges)) {
    return edges
      .map(new EdgeToTuple2Map<>())
      .name("Edge to Tuple2")
      .setParallelism(parallelism.getValue().intValue());
  } else {
    return edges;
  }
}

代码示例来源:origin: apache/flink

@Override
protected void testProgram() throws Exception {
  ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  env.setParallelism(4);
  DataSet<Long> input = env.generateSequence(1, 10);
  DataSet<Long> bc1 = env.generateSequence(1, 5);
  DataSet<Long> bc2 = env.generateSequence(6, 10);
  List<Long> result = input
      .map(new Mapper())
      .withBroadcastSet(bc1.union(bc2), BC_NAME)
      .reduce(new Reducer())
      .collect();
  Assert.assertEquals(Long.valueOf(3025), result.get(0));
}

代码示例来源:origin: apache/flink

private static DataSet<String> analyze(DataSet<String> input, DataSet<String> stats, int branches) {
    for (int i = 0; i < branches; i++) {
      final int ii = i;

      if (stats != null) {
        input = input.map(
          new RichMapFunction<String, String>() {
            @Override
            public String map(String value) {
              return value;
            }
        }).withBroadcastSet(stats.map(s -> "(" + s + ").map"), "stats");
      }

      DataSet<String> branch = input
        .map(s -> new Tuple2<>(0, s + ii)).returns(Types.TUPLE(Types.STRING, Types.INT))
        .groupBy(0)
        .minBy(1)
        .map(kv -> kv.f1).returns(Types.STRING);
      if (stats == null) {
        stats = branch;
      } else {
        stats = stats.union(branch);
      }
    }
    return stats.map(s -> "(" + s + ").stats");
  }
}

代码示例来源:origin: apache/flink

.getEdges()
.map(new ExtractEdgeIDs<>())
  .setParallelism(parallelism)
  .name("Extract edge IDs");
  .setParallelism(parallelism)
  .name("Initial scores")
.groupBy(0)
.reduce(new SumScores<>())
.setCombineHint(CombineHint.HASH)
  .setParallelism(parallelism)
  .name("Square")
.reduce(new Sum())
  .setParallelism(parallelism)
  .name("Sum");
  .setParallelism(parallelism)
  .name("Square")
.reduce(new Sum())
  .setParallelism(parallelism)
  .name("Sum");
.closeWith(passThrough)
.map(new TranslateResult<>())
  .setParallelism(parallelism)
  .name("Map result");

代码示例来源:origin: apache/flink

@Test
public void testBranchesOnlyInBCVariables1() {
  try{
    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(100);
    DataSet<Long> input = env.generateSequence(1, 10);
    DataSet<Long> bc_input = env.generateSequence(1, 10);
    
    input
      .map(new IdentityMapper<Long>()).withBroadcastSet(bc_input, "name1")
      .map(new IdentityMapper<Long>()).withBroadcastSet(bc_input, "name2")
      .output(new DiscardingOutputFormat<Long>());
    
    Plan plan = env.createProgramPlan();
    compileNoStats(plan);
  }
  catch(Exception e){
    e.printStackTrace();
    fail(e.getMessage());
  }
}

代码示例来源:origin: org.gradoop/gradoop-flink

.map(label -> Tuple2.of(label, env.readTextFile(getGraphHeadCSVPath(label))
 .map(new CSVLineToGraphHead(graphHeadFactory))
 .withBroadcastSet(metaDataBroadcast, BC_METADATA)
 .filter(graphHead -> graphHead.getLabel().equals(label))))
.collect(Collectors.toMap(t -> t.f0, t -> t.f1));
.map(label -> Tuple2.of(label, env.readTextFile(getVertexCSVPath(label))
 .map(new CSVLineToVertex(vertexFactory))
 .withBroadcastSet(metaDataBroadcast, BC_METADATA)
 .filter(vertex -> vertex.getLabel().equals(label))))
.collect(Collectors.toMap(t -> t.f0, t -> t.f1));
.map(label -> Tuple2.of(label, env.readTextFile(getEdgeCSVPath(label))
 .map(new CSVLineToEdge(edgeFactory))
 .withBroadcastSet(metaDataBroadcast, BC_METADATA)
 .filter(edge -> edge.getLabel().equals(label))))
.collect(Collectors.toMap(t -> t.f0, t -> t.f1));

代码示例来源:origin: apache/flink

case IN:
  return edges.map(new ProjectVertexWithEdgeValueMap<>(1))
      .withForwardedFields("f1->f0")
        .name("Vertex with in-edges")
      .groupBy(0).reduce(new ApplyReduceFunction<>(reduceEdgesFunction))
        .name("Reduce on edges");
case OUT:
  return edges.map(new ProjectVertexWithEdgeValueMap<>(0))
      .withForwardedFields("f0->f0")
        .name("Vertex with out-edges")
      .groupBy(0).reduce(new ApplyReduceFunction<>(reduceEdgesFunction))
        .name("Reduce on edges");
case ALL:

代码示例来源:origin: apache/flink

@Override
public EdgeMetrics<K, VV, EV> run(Graph<K, VV, EV> input)
    throws Exception {
  super.run(input);
  // s, t, (d(s), d(t))
  DataSet<Edge<K, Tuple3<EV, LongValue, LongValue>>> edgeDegreePair = input
    .run(new EdgeDegreePair<K, VV, EV>()
      .setReduceOnTargetId(reduceOnTargetId)
      .setParallelism(parallelism));
  // s, d(s), count of (u, v) where deg(u) < deg(v) or (deg(u) == deg(v) and u < v)
  DataSet<Tuple3<K, LongValue, LongValue>> edgeStats = edgeDegreePair
    .map(new EdgeStats<>())
      .setParallelism(parallelism)
      .name("Edge stats")
    .groupBy(0)
    .reduce(new SumEdgeStats<>())
    .setCombineHint(CombineHint.HASH)
      .setParallelism(parallelism)
      .name("Sum edge stats");
  edgeMetricsHelper = new EdgeMetricsHelper<>();
  edgeStats
    .output(edgeMetricsHelper)
      .setParallelism(parallelism)
      .name("Edge metrics");
  return this;
}

代码示例来源:origin: apache/flink

@Test
public void testBranchBeforeIteration() {
  ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  env.setParallelism(DEFAULT_PARALLELISM);
  DataSet<Long> source1 = env.generateSequence(0,1);
  DataSet<Long> source2 = env.generateSequence(0,1);
  IterativeDataSet<Long> loopHead = source2.iterate(10).name("Loop");
  DataSet<Long> loopTail = source1.map(new IdentityMapper<Long>()).withBroadcastSet(loopHead, "BC").name("In-Loop Mapper");
  DataSet<Long> loopRes = loopHead.closeWith(loopTail);
  DataSet<Long> map = source1.map(new IdentityMapper<Long>()).withBroadcastSet(loopRes, "BC").name("Post-Loop Mapper");
  map.output(new DiscardingOutputFormat<Long>());
  Plan plan = env.createProgramPlan();
  try {
    compileNoStats(plan);
  }
  catch (Exception e) {
    e.printStackTrace();
    Assert.fail(e.getMessage());
  }
}

代码示例来源:origin: dbs-leipzig/gradoop

@Override
public DataSet<Tuple3<String, String, String>> readDistributed(String path, GradoopFlinkConfig
 config) {
 return config.getExecutionEnvironment()
  .readTextFile(path)
  .map(line -> StringEscaper.split(line, CSVConstants.TOKEN_DELIMITER, 3))
  .map(tokens -> Tuple3.of(tokens[0], tokens[1], tokens[2]))
  .returns(new TypeHint<Tuple3<String, String, String>>() {
  });
}

代码示例来源:origin: apache/flink

.map(new KMeans.SelectNearestCenter()).withBroadcastSet(loop, "centroids")
.rebalance()
.groupBy(0).reduce(new KMeans.CentroidAccumulator())
.map(new KMeans.SelectNearestCenter()).withBroadcastSet(finalCentroids, "centroids");

代码示例来源:origin: apache/flink

@Test
public void testUnaryOp() {
  try {
    ExecutionEnvironment env = ExecutionEnvironment.createCollectionsEnvironment();
    DataSet<String> bcData = env.fromElements(SUFFIX);
    List<String> result = new ArrayList<String>();
    env.fromElements(TEST_DATA)
        .map(new SuffixAppender()).withBroadcastSet(bcData, BC_VAR_NAME)
        .output(new LocalCollectionOutputFormat<String>(result));
    env.execute();
    assertEquals(TEST_DATA.length, result.size());
    for (String s : result) {
      assertTrue(s.indexOf(SUFFIX) > 0);
    }
  }
  catch (Exception e) {
    e.printStackTrace();
    fail(e.getMessage());
  }
}

代码示例来源:origin: apache/flink

DataSet<Tuple3<Integer, Integer, Integer>> in1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
DataSet<Tuple3<Integer, Integer, Integer>> in2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
in1 = in1.map(new MockMapper()).withForwardedFields("*")
    .groupBy(0)
    .reduce(new MockReducer()).withForwardedFields("f0->f1");
in2 = in2.map(new MockMapper()).withForwardedFields("*")
    .groupBy(1)
    .reduce(new MockReducer()).withForwardedFields("f1->f2");
DataSet<Tuple3<Integer, Integer, Integer>> out = in1.join(in2).where(1).equalTo(2).with(new MockJoin());

相关文章