org.apache.flink.api.java.operators.MapOperator.groupBy()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(10.0k)|赞(0)|评价(0)|浏览(101)

本文整理了Java中org.apache.flink.api.java.operators.MapOperator.groupBy()方法的一些代码示例,展示了MapOperator.groupBy()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。MapOperator.groupBy()方法的具体详情如下:
包路径:org.apache.flink.api.java.operators.MapOperator
类名称:MapOperator
方法名:groupBy

MapOperator.groupBy介绍

暂无

代码示例

代码示例来源:origin: apache/flink

@Test
public void testForcedRebalancing() throws Exception {
  /*
   * Test forced rebalancing
   */
  final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  // generate some number in parallel
  DataSet<Long> ds = env.generateSequence(1, 3000);
  DataSet<Tuple2<Integer, Integer>> uniqLongs = ds
      // introduce some partition skew by filtering
      .filter(new Filter1())
      // rebalance
      .rebalance()
      // count values in each partition
      .map(new PartitionIndexMapper())
      .groupBy(0)
      .reduce(new Reducer1())
      // round counts to mitigate runtime scheduling effects (lazy split assignment)
      .map(new Mapper1());
  List<Tuple2<Integer, Integer>> result = uniqLongs.collect();
  StringBuilder expected = new StringBuilder();
  int numPerPartition = 2220 / env.getParallelism() / 10;
  for (int i = 0; i < env.getParallelism(); i++) {
    expected.append('(').append(i).append(',')
    .append(numPerPartition).append(")\n");
  }
  compareResultAsText(result, expected.toString());
}

代码示例来源:origin: apache/flink

.withForwardedFields("f1->f0")
        .name("Vertex with in-edges")
      .groupBy(0).reduce(new ApplyReduceFunction<>(reduceEdgesFunction))
        .name("Reduce on edges");
case OUT:
      .withForwardedFields("f0->f0")
        .name("Vertex with out-edges")
      .groupBy(0).reduce(new ApplyReduceFunction<>(reduceEdgesFunction))
        .name("Reduce on edges");
case ALL:

代码示例来源:origin: apache/flink

return edges.map(new ProjectVertexIdMap<>(1)).name("Vertex ID")
      .withForwardedFields("f1->f0")
      .groupBy(0).reduceGroup(new ApplyGroupReduceFunction<>(edgesFunction))
        .name("GroupReduce on in-edges").returns(typeInfo);
case OUT:
  return edges.map(new ProjectVertexIdMap<>(0)).name("Vertex ID")
      .withForwardedFields("f0")
      .groupBy(0).reduceGroup(new ApplyGroupReduceFunction<>(edgesFunction))
        .name("GroupReduce on out-edges").returns(typeInfo);
case ALL:

代码示例来源:origin: apache/flink

@Test
public void testFirstNOnGroupedDS() throws Exception {
  /*
   * First-n on grouped data set
   */
  final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
  DataSet<Tuple2<Long, Integer>> first = ds.groupBy(1).first(4)
      .map(new OneMapper2()).groupBy(0).sum(1);
  List<Tuple2<Long, Integer>> result = first.collect();
  String expected = "(1,1)\n(2,2)\n(3,3)\n(4,4)\n(5,4)\n(6,4)\n";
  compareResultAsText(result, expected);
}

代码示例来源:origin: apache/flink

@Override
public EdgeMetrics<K, VV, EV> run(Graph<K, VV, EV> input)
    throws Exception {
  super.run(input);
  // s, t, (d(s), d(t))
  DataSet<Edge<K, Tuple3<EV, LongValue, LongValue>>> edgeDegreePair = input
    .run(new EdgeDegreePair<K, VV, EV>()
      .setReduceOnTargetId(reduceOnTargetId)
      .setParallelism(parallelism));
  // s, d(s), count of (u, v) where deg(u) < deg(v) or (deg(u) == deg(v) and u < v)
  DataSet<Tuple3<K, LongValue, LongValue>> edgeStats = edgeDegreePair
    .map(new EdgeStats<>())
      .setParallelism(parallelism)
      .name("Edge stats")
    .groupBy(0)
    .reduce(new SumEdgeStats<>())
    .setCombineHint(CombineHint.HASH)
      .setParallelism(parallelism)
      .name("Sum edge stats");
  edgeMetricsHelper = new EdgeMetricsHelper<>();
  edgeStats
    .output(edgeMetricsHelper)
      .setParallelism(parallelism)
      .name("Edge metrics");
  return this;
}

代码示例来源:origin: apache/flink

.groupBy(0).reduce(new CentroidAccumulator())

代码示例来源:origin: apache/flink

.setParallelism(parallelism)
  .name("Order by ID")
.groupBy(0, 1)
.reduceGroup(new ReduceBitmask<>())
  .setParallelism(parallelism)
  .setParallelism(parallelism)
  .name("Order by degree")
.groupBy(0, 1)
.reduceGroup(new ReduceBitmask<>())
  .setParallelism(parallelism)

代码示例来源:origin: apache/flink

private static DataSet<String> analyze(DataSet<String> input, DataSet<String> stats, int branches) {
    for (int i = 0; i < branches; i++) {
      final int ii = i;

      if (stats != null) {
        input = input.map(
          new RichMapFunction<String, String>() {
            @Override
            public String map(String value) {
              return value;
            }
        }).withBroadcastSet(stats.map(s -> "(" + s + ").map"), "stats");
      }

      DataSet<String> branch = input
        .map(s -> new Tuple2<>(0, s + ii)).returns(Types.TUPLE(Types.STRING, Types.INT))
        .groupBy(0)
        .minBy(1)
        .map(kv -> kv.f1).returns(Types.STRING);
      if (stats == null) {
        stats = branch;
      } else {
        stats = stats.union(branch);
      }
    }
    return stats.map(s -> "(" + s + ").stats");
  }
}

代码示例来源:origin: apache/flink

.groupBy(0).reduce(new KMeans.CentroidAccumulator())

代码示例来源:origin: apache/flink

DataSet<Tuple3<Integer, Integer, Integer>> set = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
set = set.map(new MockMapper()).withForwardedFields("*")
    .groupBy(0)
    .reduce(new MockReducer()).withForwardedFields("f0->f1")
    .map(new MockMapper()).withForwardedFields("*")
    .groupBy(1)
    .reduce(new MockReducer()).withForwardedFields("*");

代码示例来源:origin: apache/flink

.map(new IdentityMapper<Tuple3<Long,Long,Long>>()).withForwardedFields("0", "1", "2")
.groupBy(0, 1)
.reduceGroup(new IdentityGroupReducerCombinable<Tuple3<Long,Long,Long>>()).withForwardedFields("0", "1", "2")

代码示例来源:origin: apache/flink

.groupBy("*").reduceGroup(new IdentityGroupReducer<Long>())
.withForwardedFields("*").setParallelism(p).name("Reduce1")
.map(new IdentityMapper<Long>())
.withForwardedFields("*").setParallelism(p * 2).name("Map2")
.groupBy("*").reduceGroup(new IdentityGroupReducer<Long>())
.withForwardedFields("*").setParallelism(p * 2).name("Reduce2")
.output(new DiscardingOutputFormat<Long>()).setParallelism(p * 2).name("Sink");

代码示例来源:origin: apache/flink

DataSet<Tuple3<Integer, Integer, Integer>> in2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
in1 = in1.map(new MockMapper()).withForwardedFields("*")
    .groupBy(0)
    .reduce(new MockReducer()).withForwardedFields("f0->f1");
in2 = in2.map(new MockMapper()).withForwardedFields("*")
    .groupBy(1)
    .reduce(new MockReducer()).withForwardedFields("f1->f2");
DataSet<Tuple3<Integer, Integer, Integer>> out = in1.join(in2).where(1).equalTo(2).with(new MockJoin());

代码示例来源:origin: apache/flink

.groupBy("*").reduceGroup(new IdentityGroupReducer<Long>())
  .withForwardedFields("*").setParallelism(p).name("Reduce1")
.map(new IdentityMapper<Long>())
  .withForwardedFields("*").setParallelism(p * 2).name("Map2")
.groupBy("*").reduceGroup(new IdentityGroupReducer<Long>())
  .withForwardedFields("*").setParallelism(p * 2).name("Reduce2")
.output(new DiscardingOutputFormat<Long>()).setParallelism(p * 2).name("Sink");

代码示例来源:origin: apache/flink

.groupBy("*").reduceGroup(new IdentityGroupReducer<Long>())
.withForwardedFields("*").setParallelism(p).name("Reduce1")
.map(new IdentityMapper<Long>())
.withForwardedFields("*").setParallelism(p).name("Map2")
.groupBy("*").reduceGroup(new IdentityGroupReducer<Long>())
.withForwardedFields("*").setParallelism(p * 2).name("Reduce2")
.output(new DiscardingOutputFormat<Long>()).setParallelism(p * 2).name("Sink");

代码示例来源:origin: apache/flink

.map(new IdentityMapper<Long>())
  .withForwardedFields("*").setParallelism(p * 2).name("Map1")
.groupBy("*").reduceGroup(new IdentityGroupReducer<Long>())
  .withForwardedFields("*").setParallelism(p * 2).name("Reduce1")
.map(new IdentityMapper<Long>())
  .withForwardedFields("*").setParallelism(p).name("Map2")
.groupBy("*").reduceGroup(new IdentityGroupReducer<Long>())
  .withForwardedFields("*").setParallelism(p).name("Reduce2")
.output(new DiscardingOutputFormat<Long>()).setParallelism(p).name("Sink");

代码示例来源:origin: apache/flink

.map(new IdentityMapper<Tuple3<Long,Long,Long>>()).withForwardedFields("0", "1", "2")
.groupBy(0, 1)
.reduceGroup(new IdentityGroupReducerCombinable<Tuple3<Long,Long,Long>>()).withForwardedFields("0", "1", "2")

代码示例来源:origin: apache/flink

.groupBy(0).reduce(new KMeans.CentroidAccumulator())

代码示例来源:origin: apache/flink

/**
   * Source -> Map -> Reduce -> Cross -> Reduce -> Cross -> Reduce ->
   * |--------------------------/                  /
   * |--------------------------------------------/
   * 
   * First cross has SameKeyFirst output contract
   */
  @Test
  public void testTicket158() {
    // construct the plan
    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(DEFAULT_PARALLELISM);
    DataSet<Long> set1 = env.generateSequence(0,1);

    set1.map(new IdentityMapper<Long>()).name("Map1")
        .groupBy("*").reduceGroup(new IdentityGroupReducer<Long>()).name("Reduce1")
        .cross(set1).with(new IdentityCrosser<Long>()).withForwardedFieldsFirst("*").name("Cross1")
        .groupBy("*").reduceGroup(new IdentityGroupReducer<Long>()).name("Reduce2")
        .cross(set1).with(new IdentityCrosser<Long>()).name("Cross2")
        .groupBy("*").reduceGroup(new IdentityGroupReducer<Long>()).name("Reduce3")
        .output(new DiscardingOutputFormat<Long>()).name("Sink");

    Plan plan = env.createProgramPlan();
    OptimizedPlan oPlan = compileNoStats(plan);

    JobGraphGenerator jobGen = new JobGraphGenerator();
    jobGen.compileJobGraph(oPlan);
  }
}

代码示例来源:origin: apache/flink

.groupBy(0).sum(1).name("Count Words")

相关文章