org.apache.flink.api.java.operators.UnsortedGrouping类的使用及代码示例

x33g5p2x  于2022-02-01 转载在 其他  
字(12.9k)|赞(0)|评价(0)|浏览(164)

本文整理了Java中org.apache.flink.api.java.operators.UnsortedGrouping类的一些代码示例,展示了UnsortedGrouping类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。UnsortedGrouping类的具体详情如下:
包路径:org.apache.flink.api.java.operators.UnsortedGrouping
类名称:UnsortedGrouping

UnsortedGrouping介绍

[英]A Grouping that is unsorted.
[中]未分类的分组。

代码示例

代码示例来源:origin: apache/flink

/**
 * Returns a new set containing the first n elements in this grouped {@link DataSet}.
 *
 * @param n The desired number of elements for each group.
 * @return A GroupReduceOperator that represents the DataSet containing the elements.
*/
public GroupReduceOperator<T, T> first(int n) {
  if (n < 1) {
    throw new InvalidProgramException("Parameter n of first(n) must be at least 1.");
  }
  return reduceGroup(new FirstReducer<T>(n));
}

代码示例来源:origin: apache/flink

private void createSortOperation(PythonOperationInfo info) {
  if (sets.isDataSet(info.parentID)) {
    throw new IllegalArgumentException("sort() can not be applied on a DataSet.");
  } else if (sets.isUnsortedGrouping(info.parentID)) {
    sets.add(info.setID, sets.getUnsortedGrouping(info.parentID).sortGroup(info.field, info.order));
  } else if (sets.isSortedGrouping(info.parentID)) {
    sets.add(info.setID, sets.getSortedGrouping(info.parentID).sortGroup(info.field, info.order));
  }
}

代码示例来源:origin: apache/flink

/**
 * Return the degree of all vertices in the graph.
 *
 * @return A DataSet of {@code Tuple2<vertexId, degree>}
 */
public DataSet<Tuple2<K, LongValue>> getDegrees() {
  return outDegrees()
    .union(inDegrees()).name("In- and out-degree")
    .groupBy(0).sum(1).name("Sum");
}

代码示例来源:origin: apache/flink

@Override
public DataSet<Tuple3<K, K, K>> run(Graph<K, VV, EV> input) throws Exception {
  DataSet<Edge<K, EV>> edges = input.getEdges();
  // annotate edges with degrees
  DataSet<EdgeWithDegrees<K>> edgesWithDegrees = edges.flatMap(new EdgeDuplicator<>())
      .groupBy(0).sortGroup(1, Order.ASCENDING).reduceGroup(new DegreeCounter<>())
      .groupBy(EdgeWithDegrees.V1, EdgeWithDegrees.V2).reduce(new DegreeJoiner<>());
  // project edges by degrees
  DataSet<Edge<K, NullValue>> edgesByDegree = edgesWithDegrees.map(new EdgeByDegreeProjector<>());
  // project edges by vertex id
  DataSet<Edge<K, NullValue>> edgesById = edgesByDegree.map(new EdgeByIdProjector<>());
  DataSet<Tuple3<K, K, K>> triangles = edgesByDegree
      // build triads
      .groupBy(EdgeWithDegrees.V1).sortGroup(EdgeWithDegrees.V2, Order.ASCENDING)
      .reduceGroup(new TriadBuilder<>())
      // filter triads
      .join(edgesById, JoinHint.REPARTITION_HASH_SECOND).where(Triad.V2, Triad.V3).equalTo(0, 1).with(new TriadFilter<>());
  return triangles;
}

代码示例来源:origin: apache/flink

@Test
public void testFirstNOnGroupedDS() throws Exception {
  /*
   * First-n on grouped data set
   */
  final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
  DataSet<Tuple2<Long, Integer>> first = ds.groupBy(1).first(4)
      .map(new OneMapper2()).groupBy(0).sum(1);
  List<Tuple2<Long, Integer>> result = first.collect();
  String expected = "(1,1)\n(2,2)\n(3,3)\n(4,4)\n(5,4)\n(6,4)\n";
  compareResultAsText(result, expected);
}

代码示例来源:origin: apache/flink

private static <T, B extends CopyableIterator<T>> void testReducePerformance
  (B iterator, TypeInformation<T> typeInfo, CombineHint hint, int numRecords, boolean print) throws Exception {
  ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  env.getConfig().enableObjectReuse();
  @SuppressWarnings("unchecked")
  DataSet<T> output =
    env.fromParallelCollection(new SplittableRandomIterator<T, B>(numRecords, iterator), typeInfo)
      .groupBy("0")
      .reduce(new SumReducer()).setCombineHint(hint);
  long start = System.currentTimeMillis();
  System.out.println(output.count());
  long end = System.currentTimeMillis();
  if (print) {
    System.out.println("=== Time for " + iterator.getClass().getSimpleName() + " with hint " + hint.toString() + ": " + (end - start) + "ms ===");
  }
}

代码示例来源:origin: apache/flink

/**
 * Syntactic sugar for aggregate (MIN, field).
 * @param field The index of the Tuple field on which the aggregation function is applied.
 * @return An AggregateOperator that represents the min'ed DataSet.
 *
 * @see org.apache.flink.api.java.operators.AggregateOperator
 */
public AggregateOperator<T> min (int field) {
  return this.aggregate (Aggregations.MIN, field, Utils.getCallLocationName());
}

代码示例来源:origin: apache/flink

public static DataSet<Tuple2<Long, Long>> doDeltaIteration(DataSet<Tuple2<Long, Long>> vertices, DataSet<Tuple2<Long, Long>> edges) {
  DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> depIteration = vertices.iterateDelta(vertices, 100, 0);
      
  DataSet<Tuple1<Long>> candidates = depIteration.getWorkset().join(edges).where(0).equalTo(0)
      .projectSecond(1);
  
  DataSet<Tuple1<Long>> grouped = candidates.groupBy(0).reduceGroup(new Reduce101());
  
  DataSet<Tuple2<Long, Long>> candidatesDependencies = 
      grouped.join(edges).where(0).equalTo(1).projectSecond(0, 1);
  
  DataSet<Tuple2<Long, Long>> verticesWithNewComponents = 
      candidatesDependencies.join(depIteration.getSolutionSet()).where(0).equalTo(0)
      .with(new Join222())
      .groupBy(0).aggregate(Aggregations.MIN, 1);
  
  DataSet<Tuple2<Long, Long>> updatedComponentId = 
      verticesWithNewComponents.join(depIteration.getSolutionSet()).where(0).equalTo(0)
      .flatMap(new FlatMapJoin());
  
  DataSet<Tuple2<Long, Long>> depResult = depIteration.closeWith(updatedComponentId, updatedComponentId);
  return depResult;
  
}

代码示例来源:origin: apache/flink

@Test
public void testCustomPartitioningTupleReduce() {
  try {
    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    
    DataSet<Tuple2<Integer, Integer>> data = env.fromElements(new Tuple2<Integer, Integer>(0, 0))
        .rebalance().setParallelism(4);
    
    data.groupBy(0).withPartitioner(new TestPartitionerInt())
      .reduce(new SelectOneReducer<Tuple2<Integer,Integer>>())
      .output(new DiscardingOutputFormat<Tuple2<Integer, Integer>>());
    
    Plan p = env.createProgramPlan();
    OptimizedPlan op = compileNoStats(p);
    
    SinkPlanNode sink = op.getDataSinks().iterator().next();
    SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource();
    SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getInput().getSource();
    
    assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
    assertEquals(ShipStrategyType.PARTITION_CUSTOM, reducer.getInput().getShipStrategy());
    assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
  }
  catch (Exception e) {
    e.printStackTrace();
    fail(e.getMessage());
  }
}

代码示例来源:origin: apache/flink

@Test
public void testCustomPartitioningTupleAgg() {
  try {
    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    
    DataSet<Tuple2<Integer, Integer>> data = env.fromElements(new Tuple2<Integer, Integer>(0, 0))
        .rebalance().setParallelism(4);
    
    data.groupBy(0).withPartitioner(new TestPartitionerInt())
      .sum(1)
      .output(new DiscardingOutputFormat<Tuple2<Integer, Integer>>());
    
    Plan p = env.createProgramPlan();
    OptimizedPlan op = compileNoStats(p);
    
    SinkPlanNode sink = op.getDataSinks().iterator().next();
    SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource();
    SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getInput().getSource();
    
    assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
    assertEquals(ShipStrategyType.PARTITION_CUSTOM, reducer.getInput().getShipStrategy());
    assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
  }
  catch (Exception e) {
    e.printStackTrace();
    fail(e.getMessage());
  }
}

代码示例来源:origin: apache/flink

tupleDs.groupBy(2).first(1);
} catch (Exception e) {
  Assert.fail();
  tupleDs.groupBy(1, 3).first(10);
} catch (Exception e) {
  Assert.fail();
  tupleDs.groupBy(0).first(0);
  Assert.fail();
} catch (InvalidProgramException ipe) {
  tupleDs.groupBy(2).first(-1);
  Assert.fail();
} catch (InvalidProgramException ipe) {

代码示例来源:origin: apache/flink

public void testGroupedReduce(ExecutionEnvironment env) throws Exception {
  /*
   * Test ReduceCombineDriver and ReduceDriver
   */
  LOG.info("Testing grouped reduce");
  env.getConfig().enableObjectReuse();
  List<Tuple2<IntValue, IntValue>> enabledResult = getDataSet(env)
    .groupBy(0)
    .reduce(new OverwriteObjectsReduce(true))
    .collect();
  Collections.sort(enabledResult, comparator);
  env.getConfig().disableObjectReuse();
  List<Tuple2<IntValue, IntValue>> disabledResult = getDataSet(env)
    .groupBy(0)
    .reduce(new OverwriteObjectsReduce(true))
    .collect();
  Collections.sort(disabledResult, comparator);
  Assert.assertThat(disabledResult, is(enabledResult));
}

代码示例来源:origin: apache/flink

/**
 * Syntactic sugar for aggregate (MAX, field).
 * @param field The index of the Tuple field on which the aggregation function is applied.
 * @return An AggregateOperator that represents the max'ed DataSet.
 *
 * @see org.apache.flink.api.java.operators.AggregateOperator
 */
public AggregateOperator<T> max (int field) {
  return this.aggregate (Aggregations.MAX, field, Utils.getCallLocationName());
}

代码示例来源:origin: apache/flink

@Test
public void testCustomPartitioningTupleReduce() {
  try {
    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    DataSet<Pojo2> data = env.fromElements(new Pojo2())
        .rebalance().setParallelism(4);
    data.groupBy("a").withPartitioner(new TestPartitionerInt())
        .reduce(new SelectOneReducer<Pojo2>())
        .output(new DiscardingOutputFormat<Pojo2>());
    Plan p = env.createProgramPlan();
    OptimizedPlan op = compileNoStats(p);
    SinkPlanNode sink = op.getDataSinks().iterator().next();
    SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource();
    SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getInput().getSource();
    assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
    assertEquals(ShipStrategyType.PARTITION_CUSTOM, reducer.getInput().getShipStrategy());
    assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
  }
  catch (Exception e) {
    e.printStackTrace();
    fail(e.getMessage());
  }
}

代码示例来源:origin: apache/flink

private <K extends Tuple> void createFirstOperation(PythonOperationInfo info) {
  if (sets.isDataSet(info.parentID)) {
    DataSet<byte[]> op = sets.getDataSet(info.parentID);
    sets.add(info.setID, op
      .first(info.count).setParallelism(info.parallelism).name("First"));
  } else if (sets.isUnsortedGrouping(info.parentID)) {
    UnsortedGrouping<Tuple2<K, byte[]>> op = sets.getUnsortedGrouping(info.parentID);
    sets.add(info.setID, op
      .first(info.count).setParallelism(info.parallelism).name("First")
      .map(new KeyDiscarder<K>()).setParallelism(info.parallelism).name("FirstPostStep"));
  } else if (sets.isSortedGrouping(info.parentID)) {
    SortedGrouping<Tuple2<K, byte[]>> op = sets.getSortedGrouping(info.parentID);
    sets.add(info.setID, op
      .first(info.count).setParallelism(info.parallelism).name("First")
      .map(new KeyDiscarder<K>()).setParallelism(info.parallelism).name("FirstPostStep"));
  }
}

代码示例来源:origin: apache/flink

private <IN, OUT> DataSet<OUT> applyReduceOperation(UnsortedGrouping<IN> op1, PythonOperationInfo info, TypeInformation<OUT> type) {
    return op1
      .reduceGroup(new IdentityGroupReduce<IN>()).setCombinable(false).setParallelism(info.parallelism).name("PythonReducePreStep")
      .mapPartition(new PythonMapPartition<IN, OUT>(operatorConfig, info.envID, info.setID, type))
      .setParallelism(info.parallelism).name(info.name);
  }
}

代码示例来源:origin: apache/flink

@Test(expected = InvalidProgramException.class)
public void testGroupSortKeyFields4() {
  final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  DataSet<Tuple4<Integer, Long, CustomType, Long[]>> tupleDs =
      env.fromCollection(tupleWithCustomData, tupleWithCustomInfo);
  // should not work
  tupleDs.groupBy(0)
      .sortGroup(2, Order.ASCENDING);
}

代码示例来源:origin: apache/flink

private Plan getWordCountPlan(File inFile, File outFile, int parallelism) {
    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(parallelism);
    env.readTextFile(inFile.getAbsolutePath())
      .flatMap(new Tokenizer())
      .groupBy(0)
      .sum(1)
      .writeAsCsv(outFile.getAbsolutePath());
    return env.createProgramPlan();
  }
}

代码示例来源:origin: apache/flink

@Override
protected void testProgram() throws Exception {
  final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  DataSet<String> text = env.readTextFile(textPath);
  DataSet<WC> counts = text
      .flatMap(new Tokenizer())
      .groupBy("complex.someTest")
      .reduce(new ReduceFunction<WC>() {
        private static final long serialVersionUID = 1L;
        public WC reduce(WC value1, WC value2) {
          return new WC(value1.complex.someTest, value1.count + value2.count);
        }
      });
  counts.writeAsText(resultPath);
  env.execute("WordCount with custom data types example");
}

代码示例来源:origin: apache/flink

/**
 * Syntactic sugar for aggregate (SUM, field).
 * @param field The index of the Tuple field on which the aggregation function is applied.
 * @return An AggregateOperator that represents the summed DataSet.
 *
 * @see org.apache.flink.api.java.operators.AggregateOperator
 */
public AggregateOperator<T> sum (int field) {
  return this.aggregate (Aggregations.SUM, field, Utils.getCallLocationName());
}

相关文章