org.apache.flink.api.java.operators.DataSink.sortLocalOutput()方法的使用及代码示例

x33g5p2x  于2022-01-18 转载在 其他  
字(9.2k)|赞(0)|评价(0)|浏览(101)

本文整理了Java中org.apache.flink.api.java.operators.DataSink.sortLocalOutput()方法的一些代码示例,展示了DataSink.sortLocalOutput()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。DataSink.sortLocalOutput()方法的具体详情如下:
包路径:org.apache.flink.api.java.operators.DataSink
类名称:DataSink
方法名:sortLocalOutput

DataSink.sortLocalOutput介绍

[英]Sorts each local partition of a org.apache.flink.api.java.tuple.Tuple data set on the specified field in the specified Order before it is emitted by the output format.

Note: Only tuple data sets can be sorted using integer field indices.

The tuple data set can be sorted on multiple fields in different orders by chaining #sortLocalOutput(int,Order) calls.
[中]对组织的每个本地分区进行排序。阿帕奇。弗林克。应用程序编程接口。JAVA元组。在输出格式发出之前,按指定顺序在指定字段上设置元组数据集。
注意:只有元组数据集可以使用整型字段索引进行排序。
通过链接#sortLocalOutput(int,Order)调用,元组数据集可以按不同顺序在多个字段上排序。

代码示例

代码示例来源:origin: apache/flink

@Test(expected = CompositeType.InvalidFieldReferenceException.class)
public void testFailPojoInvalidField() {
  final ExecutionEnvironment env = ExecutionEnvironment
      .getExecutionEnvironment();
  DataSet<CustomType> pojoDs = env
      .fromCollection(pojoData);
  // must not work
  pojoDs.writeAsText("/tmp/willNotHappen")
    .sortLocalOutput("myInt", Order.ASCENDING)
    .sortLocalOutput("notThere", Order.DESCENDING);
}

代码示例来源:origin: apache/flink

@Test(expected = CompositeType.InvalidFieldReferenceException.class)
public void testFailTupleInv() {
  final ExecutionEnvironment env = ExecutionEnvironment
      .getExecutionEnvironment();
  DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env
      .fromCollection(emptyTupleData, tupleTypeInfo);
  // must not work
  tupleDs.writeAsText("/tmp/willNotHappen")
    .sortLocalOutput("notThere", Order.ASCENDING)
    .sortLocalOutput("f4", Order.DESCENDING);
}

代码示例来源:origin: apache/flink

@Test(expected = IndexOutOfBoundsException.class)
public void testFailTupleIndexOutOfBounds() {
  final ExecutionEnvironment env = ExecutionEnvironment
      .getExecutionEnvironment();
  DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env
      .fromCollection(emptyTupleData, tupleTypeInfo);
  // must not work
  tupleDs.writeAsText("/tmp/willNotHappen")
    .sortLocalOutput(3, Order.ASCENDING)
    .sortLocalOutput(5, Order.DESCENDING);
}

代码示例来源:origin: apache/flink

@Test(expected = InvalidProgramException.class)
public void testArrayOrderFull() {
  List<Object[]> arrayData = new ArrayList<>();
  arrayData.add(new Object[0]);
  final ExecutionEnvironment env = ExecutionEnvironment
      .getExecutionEnvironment();
  DataSet<Object[]> pojoDs = env
      .fromCollection(arrayData);
  // must not work
  pojoDs.writeAsText("/tmp/willNotHappen")
    .sortLocalOutput("*", Order.ASCENDING);
}

代码示例来源:origin: apache/flink

@Test
public void testTupleTwoOrderExp() {
  final ExecutionEnvironment env = ExecutionEnvironment
      .getExecutionEnvironment();
  DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env
      .fromCollection(emptyTupleData, tupleTypeInfo);
  // should work
  try {
    tupleDs.writeAsText("/tmp/willNotHappen")
      .sortLocalOutput("f1", Order.ASCENDING)
      .sortLocalOutput("f4", Order.DESCENDING);
  } catch (Exception e) {
    Assert.fail();
  }
}

代码示例来源:origin: apache/flink

@Test
public void testTupleTwoOrderIdx() {
  final ExecutionEnvironment env = ExecutionEnvironment
      .getExecutionEnvironment();
  DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env
      .fromCollection(emptyTupleData, tupleTypeInfo);
  // should work
  try {
    tupleDs.writeAsText("/tmp/willNotHappen")
      .sortLocalOutput(0, Order.ASCENDING)
      .sortLocalOutput(3, Order.DESCENDING);
  } catch (Exception e) {
    Assert.fail();
  }
}

代码示例来源:origin: apache/flink

@Test
public void testPojoTwoOrder() {
  final ExecutionEnvironment env = ExecutionEnvironment
      .getExecutionEnvironment();
  DataSet<CustomType> pojoDs = env
      .fromCollection(pojoData);
  // should work
  try {
    pojoDs.writeAsText("/tmp/willNotHappen")
      .sortLocalOutput("myLong", Order.ASCENDING)
      .sortLocalOutput("myString", Order.DESCENDING);
  } catch (Exception e) {
    Assert.fail();
  }
}

代码示例来源:origin: apache/flink

public void testTupleSingleOrderExpFull() {
  final ExecutionEnvironment env = ExecutionEnvironment
      .getExecutionEnvironment();
  DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env
      .fromCollection(emptyTupleData, tupleTypeInfo);
  // should not work
  tupleDs.writeAsText("/tmp/willNotHappen")
    .sortLocalOutput("*", Order.ANY);
}

代码示例来源:origin: apache/flink

@Test
public void testTupleTwoOrderMixed() {
  final ExecutionEnvironment env = ExecutionEnvironment
      .getExecutionEnvironment();
  DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env
      .fromCollection(emptyTupleData, tupleTypeInfo);
  // should work
  try {
    tupleDs.writeAsText("/tmp/willNotHappen")
      .sortLocalOutput(4, Order.ASCENDING)
      .sortLocalOutput("f2", Order.DESCENDING);
  } catch (Exception e) {
    Assert.fail();
  }
}

代码示例来源:origin: apache/flink

@Test(expected = InvalidProgramException.class)
public void testFailPojoIdx() {
  final ExecutionEnvironment env = ExecutionEnvironment
      .getExecutionEnvironment();
  DataSet<CustomType> pojoDs = env
      .fromCollection(pojoData);
  // must not work
  pojoDs.writeAsText("/tmp/willNotHappen")
    .sortLocalOutput(1, Order.DESCENDING);
}

代码示例来源:origin: apache/flink

@Test(expected = InvalidProgramException.class)
public void testPojoSingleOrderFull() {
  final ExecutionEnvironment env = ExecutionEnvironment
      .getExecutionEnvironment();
  DataSet<CustomType> pojoDs = env
      .fromCollection(pojoData);
  // must not work
  pojoDs.writeAsText("/tmp/willNotHappen")
    .sortLocalOutput("*", Order.ASCENDING);
}

代码示例来源:origin: apache/flink

@Test(expected = InvalidProgramException.class)
public void testFailPrimitiveOrder2() {
  final ExecutionEnvironment env = ExecutionEnvironment
      .getExecutionEnvironment();
  DataSet<Long> longDs = env
      .generateSequence(0, 2);
  // must not work
  longDs.writeAsText("/tmp/willNotHappen")
    .sortLocalOutput("0", Order.ASCENDING);
}

代码示例来源:origin: apache/flink

@Test(expected = InvalidProgramException.class)
public void testFailPrimitiveOrder3() {
  final ExecutionEnvironment env = ExecutionEnvironment
      .getExecutionEnvironment();
  DataSet<Long> longDs = env
      .generateSequence(0, 2);
  // must not work
  longDs.writeAsText("/tmp/willNotHappen")
    .sortLocalOutput("nope", Order.ASCENDING);
}

代码示例来源:origin: apache/flink

@Test(expected = InvalidProgramException.class)
public void testFailPrimitiveOrder1() {
  final ExecutionEnvironment env = ExecutionEnvironment
      .getExecutionEnvironment();
  DataSet<Long> longDs = env
      .generateSequence(0, 2);
  // must not work
  longDs.writeAsText("/tmp/willNotHappen")
    .sortLocalOutput(0, Order.ASCENDING);
}

代码示例来源:origin: apache/flink

@Test
public void testTupleSingleOrderIdx() {
  final ExecutionEnvironment env = ExecutionEnvironment
      .getExecutionEnvironment();
  DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env
      .fromCollection(emptyTupleData, tupleTypeInfo);
  // should work
  try {
    tupleDs.writeAsText("/tmp/willNotHappen").sortLocalOutput(0, Order.ANY);
  } catch (Exception e) {
    Assert.fail();
  }
}

代码示例来源:origin: apache/flink

@Test
public void testPrimitiveOrder() {
  final ExecutionEnvironment env = ExecutionEnvironment
      .getExecutionEnvironment();
  DataSet<Long> longDs = env
      .generateSequence(0, 2);
  // should work
  try {
    longDs.writeAsText("/tmp/willNotHappen")
      .sortLocalOutput("*", Order.ASCENDING);
  } catch (Exception e) {
    Assert.fail();
  }
}

代码示例来源:origin: apache/flink

@Test
public void testPojoSingleOrder() {
  final ExecutionEnvironment env = ExecutionEnvironment
      .getExecutionEnvironment();
  DataSet<CustomType> pojoDs = env
      .fromCollection(pojoData);
  // should work
  try {
    pojoDs.writeAsText("/tmp/willNotHappen")
      .sortLocalOutput("myString", Order.ASCENDING);
  } catch (Exception e) {
    Assert.fail();
  }
}

代码示例来源:origin: apache/flink

@Test
public void testTupleSingleOrderExp() {
  final ExecutionEnvironment env = ExecutionEnvironment
      .getExecutionEnvironment();
  DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env
      .fromCollection(emptyTupleData, tupleTypeInfo);
  // should work
  try {
    tupleDs.writeAsText("/tmp/willNotHappen")
      .sortLocalOutput("f0", Order.ANY);
  } catch (Exception e) {
    Assert.fail();
  }
}

代码示例来源:origin: apache/flink

@Test
public void testIntSortingParallelism1() throws Exception {
  final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  DataSet<Integer> ds = CollectionDataSets.getIntegerDataSet(env);
  ds.writeAsText(resultPath).sortLocalOutput("*", Order.DESCENDING).setParallelism(1);
  env.execute();
  String expected = "5\n5\n5\n5\n5\n4\n4\n4\n4\n3\n3\n3\n2\n2\n1\n";
  compareResultsByLinesInMemoryWithStrictOrder(expected, resultPath);
}

代码示例来源:origin: apache/flink

@Test
public void testPojoSortingSingleParallelism1() throws Exception {
  final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  DataSet<CollectionDataSets.POJO> ds = CollectionDataSets.getMixedPojoDataSet(env);
  ds.writeAsText(resultPath).sortLocalOutput("number", Order.ASCENDING).setParallelism(1);
  env.execute();
  String expected = "1 First (10,100,1000,One) 10100\n" +
      "2 First_ (10,105,1000,One) 10200\n" +
      "3 First (11,102,3000,One) 10200\n" +
      "4 First_ (11,106,1000,One) 10300\n" +
      "5 First (11,102,2000,One) 10100\n" +
      "6 Second_ (20,200,2000,Two) 10100\n" +
      "7 Third (31,301,2000,Three) 10200\n" +
      "8 Third_ (30,300,1000,Three) 10100\n";
  compareResultsByLinesInMemoryWithStrictOrder(expected, resultPath);
}

相关文章