org.apache.flink.api.java.operators.MapOperator.setParallelism()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(12.1k)|赞(0)|评价(0)|浏览(97)

本文整理了Java中org.apache.flink.api.java.operators.MapOperator.setParallelism()方法的一些代码示例,展示了MapOperator.setParallelism()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。MapOperator.setParallelism()方法的具体详情如下:
包路径:org.apache.flink.api.java.operators.MapOperator
类名称:MapOperator
方法名:setParallelism

MapOperator.setParallelism介绍

暂无

代码示例

代码示例来源:origin: apache/flink

private <K extends Tuple> void createHashPartitionOperation(PythonOperationInfo info) {
  DataSet<Tuple2<K, byte[]>> op1 = sets.getDataSet(info.parentID);
  DataSet<byte[]> result = op1
    .partitionByHash(info.keys.toArray(new String[info.keys.size()])).setParallelism(info.parallelism)
    .map(new KeyDiscarder<K>()).setParallelism(info.parallelism).name("HashPartitionPostStep");
  sets.add(info.setID, result);
}

代码示例来源:origin: apache/flink

private <K extends Tuple> void createDistinctOperation(PythonOperationInfo info) {
  DataSet<Tuple2<K, byte[]>> op = sets.getDataSet(info.parentID);
  DataSet<byte[]> result = op
    .distinct(info.keys.toArray(new String[info.keys.size()])).setParallelism(info.parallelism).name("Distinct")
    .map(new KeyDiscarder<K>()).setParallelism(info.parallelism).name("DistinctPostStep");
  sets.add(info.setID, result);
}

代码示例来源:origin: apache/flink

@Override
public DataSet plan(Graph<K, VV, EV> graph) throws Exception {
  DataSet<Edge<K, EV>> edges = graph.getEdges();
  if (hasNullValueEdges(edges)) {
    return edges
      .map(new EdgeToTuple2Map<>())
      .name("Edge to Tuple2")
      .setParallelism(parallelism.getValue().intValue());
  } else {
    return edges;
  }
}

代码示例来源:origin: apache/flink

private void createTextSink(PythonOperationInfo info) {
  DataSet<byte[]> parent = sets.getDataSet(info.parentID);
  parent.map(new StringDeserializerMap()).setParallelism(info.parallelism)
    .writeAsText(info.path, info.writeMode).setParallelism(info.parallelism).name("TextSink");
}

代码示例来源:origin: apache/flink

@SuppressWarnings("unchecked")
private <T extends Tuple> void createCsvSource(ExecutionEnvironment env, PythonOperationInfo info) {
  if (!(info.types instanceof TupleTypeInfo)) {
    throw new RuntimeException("The output type of a csv source has to be a tuple. The derived type is " + info);
  }
  Path path = new Path(info.path);
  String lineD = info.lineDelimiter;
  String fieldD = info.fieldDelimiter;
  TupleTypeInfo<T> types = (TupleTypeInfo<T>) info.types;
  sets.add(info.setID, env.createInput(new TupleCsvInputFormat<>(path, lineD, fieldD, types), types).setParallelism(info.parallelism).name("CsvSource")
    .map(new SerializerMap<T>()).setParallelism(info.parallelism).name("CsvSourcePostStep"));
}

代码示例来源:origin: apache/flink

private void createPrintSink(PythonOperationInfo info) {
  DataSet<byte[]> parent = sets.getDataSet(info.parentID);
  parent.map(new StringDeserializerMap()).setParallelism(info.parallelism).name("PrintSinkPreStep")
    .output(new PrintingOutputFormat<String>(info.toError)).setParallelism(info.parallelism);
}

代码示例来源:origin: apache/flink

private void createCsvSink(PythonOperationInfo info) {
  DataSet<byte[]> parent = sets.getDataSet(info.parentID);
  parent.map(new StringTupleDeserializerMap()).setParallelism(info.parallelism).name("CsvSinkPreStep")
      .writeAsCsv(info.path, info.lineDelimiter, info.fieldDelimiter, info.writeMode).setParallelism(info.parallelism).name("CsvSink");
}

代码示例来源:origin: apache/flink

private void createTextSource(ExecutionEnvironment env, PythonOperationInfo info) {
  sets.add(info.setID, env.readTextFile(info.path).setParallelism(info.parallelism).name("TextSource")
    .map(new SerializerMap<String>()).setParallelism(info.parallelism).name("TextSourcePostStep"));
}

代码示例来源:origin: apache/flink

private void createSequenceSource(ExecutionEnvironment env, PythonOperationInfo info) {
  sets.add(info.setID, env.generateSequence(info.frm, info.to).setParallelism(info.parallelism).name("SequenceSource")
    .map(new SerializerMap<Long>()).setParallelism(info.parallelism).name("SequenceSourcePostStep"));
}

代码示例来源:origin: apache/flink

private void createValueSource(ExecutionEnvironment env, PythonOperationInfo info) {
  sets.add(info.setID, env.fromCollection(info.values).setParallelism(info.parallelism).name("ValueSource")
    .map(new SerializerMap<>()).setParallelism(info.parallelism).name("ValueSourcePostStep"));
}

代码示例来源:origin: apache/flink

@Override
public Graph<LongValue, NullValue, NullValue> generate() {
  Preconditions.checkState(vertexPairCount > 0);
  // Vertices
  long vertexCount = 2 * vertexPairCount;
  DataSet<Vertex<LongValue, NullValue>> vertices = GraphGeneratorUtils.vertexSequence(env, parallelism, vertexCount);
  // Edges
  LongValueSequenceIterator iterator = new LongValueSequenceIterator(0, vertexCount - 1);
  DataSet<Edge<LongValue, NullValue>> edges = env
    .fromParallelCollection(iterator, LongValue.class)
      .setParallelism(parallelism)
      .name("Edge iterators")
    .map(new LinkVertexToSingletonNeighbor())
      .setParallelism(parallelism)
      .name("Complete graph edges");
  // Graph
  return Graph.fromDataSet(vertices, edges, env);
}

代码示例来源:origin: apache/flink

private <K extends Tuple> void createFirstOperation(PythonOperationInfo info) {
  if (sets.isDataSet(info.parentID)) {
    DataSet<byte[]> op = sets.getDataSet(info.parentID);
    sets.add(info.setID, op
      .first(info.count).setParallelism(info.parallelism).name("First"));
  } else if (sets.isUnsortedGrouping(info.parentID)) {
    UnsortedGrouping<Tuple2<K, byte[]>> op = sets.getUnsortedGrouping(info.parentID);
    sets.add(info.setID, op
      .first(info.count).setParallelism(info.parallelism).name("First")
      .map(new KeyDiscarder<K>()).setParallelism(info.parallelism).name("FirstPostStep"));
  } else if (sets.isSortedGrouping(info.parentID)) {
    SortedGrouping<Tuple2<K, byte[]>> op = sets.getSortedGrouping(info.parentID);
    sets.add(info.setID, op
      .first(info.count).setParallelism(info.parallelism).name("First")
      .map(new KeyDiscarder<K>()).setParallelism(info.parallelism).name("FirstPostStep"));
  }
}

代码示例来源:origin: apache/flink

@Test
public void testCorrectnessOfAllGroupReduceForTuplesWithCombine() throws Exception {
  /*
   * check correctness of all-groupreduce for tuples with combine
   */
  org.junit.Assume.assumeTrue(mode != TestExecutionMode.COLLECTION);
  final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env)
      .map(new IdentityMapper<Tuple3<Integer, Long, String>>()).setParallelism(4);
  Configuration cfg = new Configuration();
  cfg.setString(Optimizer.HINT_SHIP_STRATEGY, Optimizer.HINT_SHIP_STRATEGY_REPARTITION);
  DataSet<Tuple2<Integer, String>> reduceDs = ds.reduceGroup(new Tuple3AllGroupReduceWithCombine())
      .withParameters(cfg);
  List<Tuple2<Integer, String>> result = reduceDs.collect();
  String expected = "322," +
      "testtesttesttesttesttesttesttesttesttesttesttesttesttesttesttesttesttesttesttesttest\n";
  compareResultAsTuples(result, expected);
}

代码示例来源:origin: apache/flink

@Test
public void testSortPartitionByKeyField() throws Exception {
  /*
   * Test sort partition on key field
   */
  final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  env.setParallelism(4);
  DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
  List<Tuple1<Boolean>> result = ds
      .map(new IdMapper<Tuple3<Integer, Long, String>>()).setParallelism(4) // parallelize input
      .sortPartition(1, Order.DESCENDING)
      .mapPartition(new OrderCheckMapper<>(new Tuple3Checker()))
      .distinct().collect();
  String expected = "(true)\n";
  compareResultAsText(result, expected);
}

代码示例来源:origin: apache/flink

@Test
public void testSortPartitionPojoByNestedFieldExpression() throws Exception {
  /*
   * Test sort partition on field expression
   */
  final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  env.setParallelism(3);
  DataSet<POJO> ds = CollectionDataSets.getMixedPojoDataSet(env);
  List<Tuple1<Boolean>> result = ds
      .map(new IdMapper<POJO>()).setParallelism(1) // parallelize input
      .sortPartition("nestedTupleWithCustom.f1.myString", Order.ASCENDING)
      .sortPartition("number", Order.DESCENDING)
      .mapPartition(new OrderCheckMapper<>(new PojoChecker()))
      .distinct().collect();
  String expected = "(true)\n";
  compareResultAsText(result, expected);
}

代码示例来源:origin: apache/flink

@SuppressWarnings({ "rawtypes", "unchecked" })
@Test
public void testSortPartitionByFieldExpression() throws Exception {
  /*
   * Test sort partition on field expression
   */
  final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  env.setParallelism(4);
  DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
  List<Tuple1<Boolean>> result = ds
      .map(new IdMapper()).setParallelism(4) // parallelize input
      .sortPartition("f1", Order.DESCENDING)
      .mapPartition(new OrderCheckMapper<>(new Tuple3Checker()))
      .distinct().collect();
  String expected = "(true)\n";
  compareResultAsText(result, expected);
}

代码示例来源:origin: apache/flink

@Test
public void testSortPartitionWithKeySelector2() throws Exception {
  /*
   * Test sort partition on an extracted key
   */
  final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  env.setParallelism(4);
  DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
  List<Tuple1<Boolean>> result = ds
    .map(new IdMapper<Tuple3<Integer, Long, String>>()).setParallelism(4) // parallelize input
    .sortPartition(new KeySelector<Tuple3<Integer, Long, String>, Tuple2<Integer, Long>>() {
      @Override
      public Tuple2<Integer, Long> getKey(Tuple3<Integer, Long, String> value) throws Exception {
        return new Tuple2<>(value.f0, value.f1);
      }
    }, Order.DESCENDING)
    .mapPartition(new OrderCheckMapper<>(new Tuple3Checker()))
    .distinct().collect();
  String expected = "(true)\n";
  compareResultAsText(result, expected);
}

代码示例来源:origin: apache/flink

@Test
public void testSortPartitionByNestedFieldExpression() throws Exception {
  /*
   * Test sort partition on nested field expressions
   */
  final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  env.setParallelism(3);
  DataSet<Tuple2<Tuple2<Integer, Integer>, String>> ds = CollectionDataSets.getGroupSortedNestedTupleDataSet(env);
  List<Tuple1<Boolean>> result = ds
      .map(new IdMapper<Tuple2<Tuple2<Integer, Integer>, String>>()).setParallelism(3) // parallelize input
      .sortPartition("f0.f1", Order.ASCENDING)
      .sortPartition("f1", Order.DESCENDING)
      .mapPartition(new OrderCheckMapper<>(new NestedTupleChecker()))
      .distinct().collect();
  String expected = "(true)\n";
  compareResultAsText(result, expected);
}

代码示例来源:origin: apache/flink

@Test
public void testSortPartitionByTwoFieldExpressions() throws Exception {
  /*
   * Test sort partition on two field expressions
   */
  final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  env.setParallelism(2);
  DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
  List<Tuple1<Boolean>> result = ds
      .map(new IdMapper<Tuple5<Integer, Long, Integer, String, Long>>()).setParallelism(2) // parallelize input
      .sortPartition("f4", Order.ASCENDING)
      .sortPartition("f2", Order.DESCENDING)
      .mapPartition(new OrderCheckMapper<>(new Tuple5Checker()))
      .distinct().collect();
  String expected = "(true)\n";
  compareResultAsText(result, expected);
}

代码示例来源:origin: apache/flink

@Test
public void testSortPartitionByTwoKeyFields() throws Exception {
  /*
   * Test sort partition on two key fields
   */
  final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  env.setParallelism(2);
  DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
  List<Tuple1<Boolean>> result = ds
      .map(new IdMapper<Tuple5<Integer, Long, Integer, String, Long>>()).setParallelism(2) // parallelize input
      .sortPartition(4, Order.ASCENDING)
      .sortPartition(2, Order.DESCENDING)
      .mapPartition(new OrderCheckMapper<>(new Tuple5Checker()))
      .distinct().collect();
  String expected = "(true)\n";
  compareResultAsText(result, expected);
}

相关文章