org.apache.flink.api.java.operators.GroupReduceOperator.name()方法的使用及代码示例

x33g5p2x  于2022-01-20 转载在 其他  
字(12.1k)|赞(0)|评价(0)|浏览(94)

本文整理了Java中org.apache.flink.api.java.operators.GroupReduceOperator.name()方法的一些代码示例,展示了GroupReduceOperator.name()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。GroupReduceOperator.name()方法的具体详情如下:
包路径:org.apache.flink.api.java.operators.GroupReduceOperator
类名称:GroupReduceOperator
方法名:name

GroupReduceOperator.name介绍

暂无

代码示例

代码示例来源:origin: apache/flink

private <IN, OUT> DataSet<OUT> applyGroupReduceOperation(UnsortedGrouping<IN> op1, PythonOperationInfo info, TypeInformation<OUT> type) {
  return op1
    .reduceGroup(new IdentityGroupReduce<IN>()).setCombinable(false).setParallelism(info.parallelism).name("PythonGroupReducePreStep")
    .mapPartition(new PythonMapPartition<IN, OUT>(operatorConfig, info.envID, info.setID, type))
    .setParallelism(info.parallelism).name(info.name);
}

代码示例来源:origin: apache/flink

private <IN, OUT> DataSet<OUT> applyGroupReduceOperation(DataSet<IN> op1, PythonOperationInfo info, TypeInformation<OUT> type) {
  return op1
    .reduceGroup(new IdentityGroupReduce<IN>()).setCombinable(false).name("PythonGroupReducePreStep").setParallelism(info.parallelism)
    .mapPartition(new PythonMapPartition<IN, OUT>(operatorConfig, info.envID, info.setID, type))
    .setParallelism(info.parallelism).name(info.name);
}

代码示例来源:origin: apache/flink

private <IN, OUT> DataSet<OUT> applyGroupReduceOperation(SortedGrouping<IN> op1, PythonOperationInfo info, TypeInformation<OUT> type) {
  return op1
    .reduceGroup(new IdentityGroupReduce<IN>()).setCombinable(false).setParallelism(info.parallelism).name("PythonGroupReducePreStep")
    .mapPartition(new PythonMapPartition<IN, OUT>(operatorConfig, info.envID, info.setID, type))
    .setParallelism(info.parallelism).name(info.name);
}

代码示例来源:origin: apache/flink

private <IN, OUT> DataSet<OUT> applyReduceOperation(UnsortedGrouping<IN> op1, PythonOperationInfo info, TypeInformation<OUT> type) {
    return op1
      .reduceGroup(new IdentityGroupReduce<IN>()).setCombinable(false).setParallelism(info.parallelism).name("PythonReducePreStep")
      .mapPartition(new PythonMapPartition<IN, OUT>(operatorConfig, info.envID, info.setID, type))
      .setParallelism(info.parallelism).name(info.name);
  }
}

代码示例来源:origin: apache/flink

private <IN, OUT> DataSet<OUT> applyReduceOperation(DataSet<IN> op1, PythonOperationInfo info, TypeInformation<OUT> type) {
  return op1
    .reduceGroup(new IdentityGroupReduce<IN>()).setCombinable(false).setParallelism(info.parallelism).name("PythonReducePreStep")
    .mapPartition(new PythonMapPartition<IN, OUT>(operatorConfig, info.envID, info.setID, type))
    .setParallelism(info.parallelism).name(info.name);
}

代码示例来源:origin: apache/flink

private <K extends Tuple> void createFirstOperation(PythonOperationInfo info) {
  if (sets.isDataSet(info.parentID)) {
    DataSet<byte[]> op = sets.getDataSet(info.parentID);
    sets.add(info.setID, op
      .first(info.count).setParallelism(info.parallelism).name("First"));
  } else if (sets.isUnsortedGrouping(info.parentID)) {
    UnsortedGrouping<Tuple2<K, byte[]>> op = sets.getUnsortedGrouping(info.parentID);
    sets.add(info.setID, op
      .first(info.count).setParallelism(info.parallelism).name("First")
      .map(new KeyDiscarder<K>()).setParallelism(info.parallelism).name("FirstPostStep"));
  } else if (sets.isSortedGrouping(info.parentID)) {
    SortedGrouping<Tuple2<K, byte[]>> op = sets.getSortedGrouping(info.parentID);
    sets.add(info.setID, op
      .first(info.count).setParallelism(info.parallelism).name("First")
      .map(new KeyDiscarder<K>()).setParallelism(info.parallelism).name("FirstPostStep"));
  }
}

代码示例来源:origin: apache/flink

@Override
public DataSet<Vertex<K, Degrees>> runInternal(Graph<K, VV, EV> input)
    throws Exception {
  // s, t, bitmask
  DataSet<Tuple2<K, ByteValue>> vertexWithEdgeOrder = input.getEdges()
    .flatMap(new EmitAndFlipEdge<>())
      .setParallelism(parallelism)
      .name("Emit and flip edge")
    .groupBy(0, 1)
    .reduceGroup(new ReduceBitmask<>())
      .setParallelism(parallelism)
      .name("Reduce bitmask");
  // s, d(s)
  DataSet<Vertex<K, Degrees>> vertexDegrees = vertexWithEdgeOrder
    .groupBy(0)
    .reduceGroup(new DegreeCount<>())
      .setParallelism(parallelism)
      .name("Degree count");
  if (includeZeroDegreeVertices.get()) {
    vertexDegrees = input.getVertices()
      .leftOuterJoin(vertexDegrees)
      .where(0)
      .equalTo(0)
      .with(new JoinVertexWithVertexDegrees<>())
        .setParallelism(parallelism)
        .name("Zero degree vertices");
  }
  return vertexDegrees;
}

代码示例来源:origin: apache/flink

}).name("reducer")
.output(new DiscardingOutputFormat<Double>()).name("sink");

代码示例来源:origin: apache/flink

@Override
protected void testProgram() throws Exception {
  ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  env.setParallelism(4);
  DataSet<String> initialInput = env.fromElements("1", "2", "3", "4", "5").name("input");
  IterativeDataSet<String> iteration = initialInput.iterate(5).name("Loop");
  DataSet<String> sumReduce = iteration.reduceGroup(new SumReducer()).name("Compute sum (GroupReduce");
  DataSet<String> terminationFilter = iteration.filter(new TerminationFilter()).name("Compute termination criterion (Map)");
  List<String> result = iteration.closeWith(sumReduce, terminationFilter).collect();
  containsResultAsText(result, EXPECTED);
}

代码示例来源:origin: apache/flink

@Override
protected void testProgram() throws Exception {
  ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  env.setParallelism(4);
  DataSet<String> initialInput = env.fromElements("1", "2", "3", "4", "5").name("input");
  IterativeDataSet<String> iteration = initialInput.iterate(5).name("Loop");
  DataSet<String> sumReduce = iteration.reduceGroup(new SumReducer()).name("Compute sum (GroupReduce");
  DataSet<String> terminationFilter = sumReduce.filter(new TerminationFilter()).name("Compute termination criterion (Map)");
  List<String> result = iteration.closeWith(sumReduce, terminationFilter).collect();
  containsResultAsText(result, EXPECTED);
}

代码示例来源:origin: apache/flink

/**
   * Source -> Map -> Reduce -> Cross -> Reduce -> Cross -> Reduce ->
   * |--------------------------/                  /
   * |--------------------------------------------/
   * 
   * First cross has SameKeyFirst output contract
   */
  @Test
  public void testTicket158() {
    // construct the plan
    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(DEFAULT_PARALLELISM);
    DataSet<Long> set1 = env.generateSequence(0,1);

    set1.map(new IdentityMapper<Long>()).name("Map1")
        .groupBy("*").reduceGroup(new IdentityGroupReducer<Long>()).name("Reduce1")
        .cross(set1).with(new IdentityCrosser<Long>()).withForwardedFieldsFirst("*").name("Cross1")
        .groupBy("*").reduceGroup(new IdentityGroupReducer<Long>()).name("Reduce2")
        .cross(set1).with(new IdentityCrosser<Long>()).name("Cross2")
        .groupBy("*").reduceGroup(new IdentityGroupReducer<Long>()).name("Reduce3")
        .output(new DiscardingOutputFormat<Long>()).name("Sink");

    Plan plan = env.createProgramPlan();
    OptimizedPlan oPlan = compileNoStats(plan);

    JobGraphGenerator jobGen = new JobGraphGenerator();
    jobGen.compileJobGraph(oPlan);
  }
}

代码示例来源:origin: apache/flink

@Override
public EdgeMetrics<K, VV, EV> run(Graph<K, VV, EV> input)
    throws Exception {
  super.run(input);
  // s, t, (d(s), d(t))
  DataSet<Edge<K, Tuple3<EV, Degrees, Degrees>>> edgeDegreesPair = input
    .run(new EdgeDegreesPair<K, VV, EV>()
      .setParallelism(parallelism));
  // s, d(s), count of (u, v) where deg(u) < deg(v) or (deg(u) == deg(v) and u < v)
  DataSet<Tuple3<K, Degrees, LongValue>> edgeStats = edgeDegreesPair
    .flatMap(new EdgeStats<>())
      .setParallelism(parallelism)
      .name("Edge stats")
    .groupBy(0, 1)
    .reduceGroup(new ReduceEdgeStats<>())
      .setParallelism(parallelism)
      .name("Reduce edge stats")
    .groupBy(0)
    .reduce(new SumEdgeStats<>())
    .setCombineHint(CombineHint.HASH)
      .setParallelism(parallelism)
      .name("Sum edge stats");
  edgeMetricsHelper = new EdgeMetricsHelper<>();
  edgeStats
    .output(edgeMetricsHelper)
      .setParallelism(parallelism)
      .name("Edge metrics");
  return this;
}

代码示例来源:origin: apache/flink

@Test
  public void testReduce() {
    // construct the plan
    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(DEFAULT_PARALLELISM);
    DataSet<Long> set1 = env.generateSequence(0,1);

    set1.reduceGroup(new IdentityGroupReducer<Long>()).name("Reduce1")
        .output(new DiscardingOutputFormat<Long>()).name("Sink");

    Plan plan = env.createProgramPlan();

    try {
      OptimizedPlan oPlan = compileNoStats(plan);
      JobGraphGenerator jobGen = new JobGraphGenerator();
      jobGen.compileJobGraph(oPlan);
    } catch(CompilerException ce) {
      ce.printStackTrace();
      fail("The pact compiler is unable to compile this plan correctly");
    }
  }
}

代码示例来源:origin: apache/flink

DataSet<Long> reduce1 = map1.groupBy("*").reduceGroup(new IdentityGroupReducer<Long>()).name("Reduce 1");
DataSet<Long> reduce2 = join1.groupBy("*").reduceGroup(new IdentityGroupReducer<Long>()).name("Reduce 2");

代码示例来源:origin: apache/flink

.withForwardedFields("*").setParallelism(p).name("Map1")
.groupBy("*").reduceGroup(new IdentityGroupReducer<Long>())
.withForwardedFields("*").setParallelism(p).name("Reduce1")
.map(new IdentityMapper<Long>())
.withForwardedFields("*").setParallelism(p).name("Map2")
.groupBy("*").reduceGroup(new IdentityGroupReducer<Long>())
.withForwardedFields("*").setParallelism(p * 2).name("Reduce2")
.output(new DiscardingOutputFormat<Long>()).setParallelism(p * 2).name("Sink");

代码示例来源:origin: apache/flink

.withForwardedFields("*").setParallelism(p * 2).name("Map1")
.groupBy("*").reduceGroup(new IdentityGroupReducer<Long>())
  .withForwardedFields("*").setParallelism(p * 2).name("Reduce1")
.map(new IdentityMapper<Long>())
  .withForwardedFields("*").setParallelism(p).name("Map2")
.groupBy("*").reduceGroup(new IdentityGroupReducer<Long>())
  .withForwardedFields("*").setParallelism(p).name("Reduce2")
.output(new DiscardingOutputFormat<Long>()).setParallelism(p).name("Sink");

代码示例来源:origin: apache/flink

public static void tcph3(String[] args) throws Exception {
  ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  env.setParallelism(Integer.parseInt(args[0]));
  //order id, order status, order data, order prio, ship prio
  DataSet<Tuple5<Long, String, String, String, Integer>> orders =
      env.readCsvFile(args[1])
      .fieldDelimiter("|").lineDelimiter("\n")
      .includeFields("101011001").types(Long.class, String.class, String.class, String.class, Integer.class)
      .name(ORDERS);
  //order id, extended price
  DataSet<Tuple2<Long, Double>> lineItems =
      env.readCsvFile(args[2])
      .fieldDelimiter("|").lineDelimiter("\n")
      .includeFields("100001").types(Long.class, Double.class)
      .name(LINEITEM);
  DataSet<Tuple2<Long, Integer>> filterO = orders.flatMap(new FilterO()).name(MAPPER_NAME);
  DataSet<Tuple3<Long, Integer, Double>> joinLiO = filterO.join(lineItems).where(0).equalTo(0).with(new JoinLiO()).name(JOIN_NAME);
  DataSet<Tuple3<Long, Integer, Double>> aggLiO = joinLiO.groupBy(0, 1).reduceGroup(new AggLiO()).name(REDUCE_NAME);
  aggLiO.writeAsCsv(args[3], "\n", "|").name(SINK);
  env.execute();
}

代码示例来源:origin: apache/flink

newCentroids.groupBy(0).reduceGroup(new RecomputeClusterCenter()).name(REDUCER_NAME);

代码示例来源:origin: apache/flink

.reduceGroup(new IdentityGroupReducer<Tuple4<Long, Long, Long, Long>>()).name("Reduce")
.output(new DiscardingOutputFormat<Tuple4<Long, Long, Long, Long>>()).name("Sink");

代码示例来源:origin: apache/flink

.withForwardedFields("*").name(NEXT_WORKSET_REDUCER_NAME);

相关文章