org.apache.flink.api.java.operators.GroupReduceOperator.returns()方法的使用及代码示例

x33g5p2x  于2022-01-20 转载在 其他  
字(11.3k)|赞(0)|评价(0)|浏览(109)

本文整理了Java中org.apache.flink.api.java.operators.GroupReduceOperator.returns()方法的一些代码示例,展示了GroupReduceOperator.returns()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。GroupReduceOperator.returns()方法的具体详情如下:
包路径:org.apache.flink.api.java.operators.GroupReduceOperator
类名称:GroupReduceOperator
方法名:returns

GroupReduceOperator.returns介绍

暂无

代码示例

代码示例来源:origin: apache/flink

.withForwardedFields("f1->f0")
      .groupBy(0).reduceGroup(new ApplyGroupReduceFunction<>(edgesFunction))
        .name("GroupReduce on in-edges").returns(typeInfo);
case OUT:
  return edges.map(new ProjectVertexIdMap<>(0)).name("Vertex ID")
      .withForwardedFields("f0")
      .groupBy(0).reduceGroup(new ApplyGroupReduceFunction<>(edgesFunction))
        .name("GroupReduce on out-edges").returns(typeInfo);
case ALL:
  return edges.flatMap(new EmitOneEdgePerNode<>()).name("Emit edge")
      .groupBy(0).reduceGroup(new ApplyGroupReduceFunction<>(edgesFunction))
        .name("GroupReduce on in- and out-edges").returns(typeInfo);
default:
  throw new IllegalArgumentException("Illegal edge direction");

代码示例来源:origin: apache/flink

@Test
public void testWithKryoGenericSer() throws Exception {
  final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  env.getConfig().enableForceKryo();
  Path in = new Path(inFile.getAbsoluteFile().toURI());
  AvroInputFormat<User> users = new AvroInputFormat<>(in, User.class);
  DataSet<User> usersDS = env.createInput(users);
  DataSet<Tuple2<String, Integer>> res = usersDS
    .groupBy((KeySelector<User, String>) value -> String.valueOf(value.getName()))
    .reduceGroup((GroupReduceFunction<User, Tuple2<String, Integer>>) (values, out) -> {
      for (User u : values) {
        out.collect(new Tuple2<>(u.getName().toString(), 1));
      }
    })
    .returns(Types.TUPLE(Types.STRING, Types.INT));
  res.writeAsText(resultPath);
  env.execute("Avro Key selection");
  expected = "(Charlie,1)\n(Alyssa,1)\n";
}

代码示例来源:origin: apache/flink

@Test
public void testWithAvroGenericSer() throws Exception {
  final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  env.getConfig().enableForceAvro();
  Path in = new Path(inFile.getAbsoluteFile().toURI());
  AvroInputFormat<User> users = new AvroInputFormat<>(in, User.class);
  DataSet<User> usersDS = env.createInput(users);
  DataSet<Tuple2<String, Integer>> res = usersDS
    .groupBy((KeySelector<User, String>) value -> String.valueOf(value.getName()))
    .reduceGroup((GroupReduceFunction<User, Tuple2<String, Integer>>) (values, out) -> {
      for (User u : values) {
        out.collect(new Tuple2<>(u.getName().toString(), 1));
      }
    })
    .returns(Types.TUPLE(Types.STRING, Types.INT));
  res.writeAsText(resultPath);
  env.execute("Avro Key selection");
  expected = "(Charlie,1)\n(Alyssa,1)\n";
}

代码示例来源:origin: apache/flink

return edgesWithSources.groupBy(0).reduceGroup(
    new ApplyNeighborGroupReduceFunction<>(neighborsFunction))
      .name("Neighbors function").returns(typeInfo);
case OUT:
  return edgesWithTargets.groupBy(0).reduceGroup(
    new ApplyNeighborGroupReduceFunction<>(neighborsFunction))
      .name("Neighbors function").returns(typeInfo);
case ALL:
      .name("Neighbors function").returns(typeInfo);
default:
  throw new IllegalArgumentException("Illegal edge direction");

代码示例来源:origin: apache/flink

@Test
public void testKeySelection() throws Exception {
  final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  env.getConfig().enableObjectReuse();
  Path in = new Path(inFile.getAbsoluteFile().toURI());
  AvroInputFormat<User> users = new AvroInputFormat<>(in, User.class);
  DataSet<User> usersDS = env.createInput(users);
  DataSet<Tuple2<String, Integer>> res = usersDS
    .groupBy("name")
    .reduceGroup((GroupReduceFunction<User, Tuple2<String, Integer>>) (values, out) -> {
      for (User u : values) {
        out.collect(new Tuple2<>(u.getName().toString(), 1));
      }
    })
    .returns(Types.TUPLE(Types.STRING, Types.INT));
  res.writeAsText(resultPath);
  env.execute("Avro Key selection");
  expected = "(Alyssa,1)\n(Charlie,1)\n";
}

代码示例来源:origin: apache/flink

@Test
public void testUnsortedGroupReduceWithTypeInformationTypeHint() throws Exception {
  final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  env.getConfig().disableSysoutLogging();
  DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env);
  DataSet<Integer> resultDs = ds
    .groupBy(0)
    .reduceGroup(new GroupReducer<Tuple3<Integer, Long, String>, Integer>())
    .returns(BasicTypeInfo.INT_TYPE_INFO);
  List<Integer> result = resultDs.collect();
  String expectedResult = "2\n" +
    "3\n" +
    "1\n";
  compareResultAsText(result, expectedResult);
}

代码示例来源:origin: apache/flink

.returns(Object.class);
res.writeAsText(resultPath);
env.execute("Simple Avro read job");

代码示例来源:origin: apache/flink

.groupBy((KeySelector<Tuple3<Double, StringValue, LongValue>, String>) value -> "")
.reduceGroup((GroupReduceFunction<Tuple3<Double, StringValue, LongValue>, String>) (values, out) -> {})
.returns(String.class)
.output(new DiscardingOutputFormat<>());

代码示例来源:origin: apache/flink

@Test
public void testSortedGroupReduceWithTypeInformationTypeHint() throws Exception {
  final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  env.getConfig().disableSysoutLogging();
  DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env);
  DataSet<Integer> resultDs = ds
    .groupBy(0)
    .sortGroup(0, Order.ASCENDING)
    .reduceGroup(new GroupReducer<Tuple3<Integer, Long, String>, Integer>())
    .returns(BasicTypeInfo.INT_TYPE_INFO);
  List<Integer> result = resultDs.collect();
  String expectedResult = "2\n" +
    "3\n" +
    "1\n";
  compareResultAsText(result, expectedResult);
}

代码示例来源:origin: apache/flink

.sortGroup((KeySelector<Tuple3<Double, StringValue, LongValue>, String>) value -> "", Order.ASCENDING)
.reduceGroup((GroupReduceFunction<Tuple3<Double, StringValue, LongValue>, String>) (values, out) -> {})
.returns(String.class)
.output(new DiscardingOutputFormat<>());

代码示例来源:origin: apache/flink

.returns(Types.TUPLE(Types.INT, Types.INT))

代码示例来源:origin: org.apache.flink/flink-gelly_2.10

.withForwardedFields("f1->f0")
      .groupBy(0).reduceGroup(new ApplyGroupReduceFunction<>(edgesFunction))
        .name("GroupReduce on in-edges").returns(typeInfo);
case OUT:
  return edges.map(new ProjectVertexIdMap<K, EV>(0)).name("Vertex ID")
      .withForwardedFields("f0")
      .groupBy(0).reduceGroup(new ApplyGroupReduceFunction<>(edgesFunction))
        .name("GroupReduce on out-edges").returns(typeInfo);
case ALL:
  return edges.flatMap(new EmitOneEdgePerNode<K, EV>()).name("Emit edge")
      .groupBy(0).reduceGroup(new ApplyGroupReduceFunction<>(edgesFunction))
        .name("GroupReduce on in- and out-edges").returns(typeInfo);
default:
  throw new IllegalArgumentException("Illegal edge direction");

代码示例来源:origin: com.alibaba.blink/flink-gelly

.withForwardedFields("f1->f0")
      .groupBy(0).reduceGroup(new ApplyGroupReduceFunction<>(edgesFunction))
        .name("GroupReduce on in-edges").returns(typeInfo);
case OUT:
  return edges.map(new ProjectVertexIdMap<>(0)).name("Vertex ID")
      .withForwardedFields("f0")
      .groupBy(0).reduceGroup(new ApplyGroupReduceFunction<>(edgesFunction))
        .name("GroupReduce on out-edges").returns(typeInfo);
case ALL:
  return edges.flatMap(new EmitOneEdgePerNode<>()).name("Emit edge")
      .groupBy(0).reduceGroup(new ApplyGroupReduceFunction<>(edgesFunction))
        .name("GroupReduce on in- and out-edges").returns(typeInfo);
default:
  throw new IllegalArgumentException("Illegal edge direction");

代码示例来源:origin: org.apache.flink/flink-gelly_2.11

.withForwardedFields("f1->f0")
      .groupBy(0).reduceGroup(new ApplyGroupReduceFunction<>(edgesFunction))
        .name("GroupReduce on in-edges").returns(typeInfo);
case OUT:
  return edges.map(new ProjectVertexIdMap<>(0)).name("Vertex ID")
      .withForwardedFields("f0")
      .groupBy(0).reduceGroup(new ApplyGroupReduceFunction<>(edgesFunction))
        .name("GroupReduce on out-edges").returns(typeInfo);
case ALL:
  return edges.flatMap(new EmitOneEdgePerNode<>()).name("Emit edge")
      .groupBy(0).reduceGroup(new ApplyGroupReduceFunction<>(edgesFunction))
        .name("GroupReduce on in- and out-edges").returns(typeInfo);
default:
  throw new IllegalArgumentException("Illegal edge direction");

代码示例来源:origin: dataArtisans/cascading-flink

.returns(new TupleTypeInfo(outFields))
.withParameters(this.getFlinkNodeConfig(node))
.setParallelism(dop)
.returns(new TupleTypeInfo(outFields))
.withParameters(this.getFlinkNodeConfig(node))
.setParallelism(dop)

代码示例来源:origin: org.apache.flink/flink-gelly_2.10

return edgesWithSources.groupBy(0).reduceGroup(
    new ApplyNeighborGroupReduceFunction<>(neighborsFunction))
      .name("Neighbors function").returns(typeInfo);
case OUT:
  return edgesWithTargets.groupBy(0).reduceGroup(
    new ApplyNeighborGroupReduceFunction<>(neighborsFunction))
      .name("Neighbors function").returns(typeInfo);
case ALL:
      .name("Neighbors function").returns(typeInfo);
default:
  throw new IllegalArgumentException("Illegal edge direction");

代码示例来源:origin: com.alibaba.blink/flink-gelly

return edgesWithSources.groupBy(0).reduceGroup(
    new ApplyNeighborGroupReduceFunction<>(neighborsFunction))
      .name("Neighbors function").returns(typeInfo);
case OUT:
  return edgesWithTargets.groupBy(0).reduceGroup(
    new ApplyNeighborGroupReduceFunction<>(neighborsFunction))
      .name("Neighbors function").returns(typeInfo);
case ALL:
      .name("Neighbors function").returns(typeInfo);
default:
  throw new IllegalArgumentException("Illegal edge direction");

代码示例来源:origin: org.apache.flink/flink-gelly_2.11

return edgesWithSources.groupBy(0).reduceGroup(
    new ApplyNeighborGroupReduceFunction<>(neighborsFunction))
      .name("Neighbors function").returns(typeInfo);
case OUT:
  return edgesWithTargets.groupBy(0).reduceGroup(
    new ApplyNeighborGroupReduceFunction<>(neighborsFunction))
      .name("Neighbors function").returns(typeInfo);
case ALL:
      .name("Neighbors function").returns(typeInfo);
default:
  throw new IllegalArgumentException("Illegal edge direction");

代码示例来源:origin: dataArtisans/cascading-flink

private DataSet<Tuple> translateGlobalGroupBy(DataSet<Tuple> input, FlowNode node, int dop,
                        String[] sortKeys, Order sortOrder, Fields outFields) {
  DataSet<Tuple> result = input;
  // sort on sorting keys if necessary
  if(sortKeys != null && sortKeys.length > 0) {
    result = result
        .sortPartition(sortKeys[0], sortOrder)
        .setParallelism(1)
        .name("reduce-"+ node.getID());
    for(int i=1; i<sortKeys.length; i++) {
      result = result
          .sortPartition(sortKeys[i], sortOrder)
          .setParallelism(1);
    }
  }
  // group all data
  return result
      .reduceGroup(new GroupByReducer(node))
      .returns(new TupleTypeInfo(outFields))
      .withParameters(this.getFlinkNodeConfig(node))
      .setParallelism(dop)
      .name("reduce-"+ node.getID());
}

代码示例来源:origin: dataArtisans/cascading-flink

.groupBy(groupKeys)
.reduceGroup(new GroupByReducer(node))
.returns(new TupleTypeInfo(outFields))
.withParameters(this.getFlinkNodeConfig(node))
.setParallelism(dop)

相关文章