本文整理了Java中org.apache.flink.api.java.operators.DataSink.sortLocalOutput()
方法的一些代码示例,展示了DataSink.sortLocalOutput()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。DataSink.sortLocalOutput()
方法的具体详情如下:
包路径:org.apache.flink.api.java.operators.DataSink
类名称:DataSink
方法名:sortLocalOutput
[英]Sorts each local partition of a org.apache.flink.api.java.tuple.Tuple data set on the specified field in the specified Order before it is emitted by the output format.
Note: Only tuple data sets can be sorted using integer field indices.
The tuple data set can be sorted on multiple fields in different orders by chaining #sortLocalOutput(int,Order) calls.
[中]对组织的每个本地分区进行排序。阿帕奇。弗林克。应用程序编程接口。JAVA元组。在输出格式发出之前,按指定顺序在指定字段上设置元组数据集。
注意:只有元组数据集可以使用整型字段索引进行排序。
通过链接#sortLocalOutput(int,Order)调用,元组数据集可以按不同顺序在多个字段上排序。
代码示例来源:origin: apache/flink
@Test(expected = CompositeType.InvalidFieldReferenceException.class)
public void testFailPojoInvalidField() {
final ExecutionEnvironment env = ExecutionEnvironment
.getExecutionEnvironment();
DataSet<CustomType> pojoDs = env
.fromCollection(pojoData);
// must not work
pojoDs.writeAsText("/tmp/willNotHappen")
.sortLocalOutput("myInt", Order.ASCENDING)
.sortLocalOutput("notThere", Order.DESCENDING);
}
代码示例来源:origin: apache/flink
@Test(expected = CompositeType.InvalidFieldReferenceException.class)
public void testFailTupleInv() {
final ExecutionEnvironment env = ExecutionEnvironment
.getExecutionEnvironment();
DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env
.fromCollection(emptyTupleData, tupleTypeInfo);
// must not work
tupleDs.writeAsText("/tmp/willNotHappen")
.sortLocalOutput("notThere", Order.ASCENDING)
.sortLocalOutput("f4", Order.DESCENDING);
}
代码示例来源:origin: apache/flink
@Test(expected = IndexOutOfBoundsException.class)
public void testFailTupleIndexOutOfBounds() {
final ExecutionEnvironment env = ExecutionEnvironment
.getExecutionEnvironment();
DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env
.fromCollection(emptyTupleData, tupleTypeInfo);
// must not work
tupleDs.writeAsText("/tmp/willNotHappen")
.sortLocalOutput(3, Order.ASCENDING)
.sortLocalOutput(5, Order.DESCENDING);
}
代码示例来源:origin: apache/flink
@Test(expected = InvalidProgramException.class)
public void testArrayOrderFull() {
List<Object[]> arrayData = new ArrayList<>();
arrayData.add(new Object[0]);
final ExecutionEnvironment env = ExecutionEnvironment
.getExecutionEnvironment();
DataSet<Object[]> pojoDs = env
.fromCollection(arrayData);
// must not work
pojoDs.writeAsText("/tmp/willNotHappen")
.sortLocalOutput("*", Order.ASCENDING);
}
代码示例来源:origin: apache/flink
@Test
public void testTupleTwoOrderExp() {
final ExecutionEnvironment env = ExecutionEnvironment
.getExecutionEnvironment();
DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env
.fromCollection(emptyTupleData, tupleTypeInfo);
// should work
try {
tupleDs.writeAsText("/tmp/willNotHappen")
.sortLocalOutput("f1", Order.ASCENDING)
.sortLocalOutput("f4", Order.DESCENDING);
} catch (Exception e) {
Assert.fail();
}
}
代码示例来源:origin: apache/flink
@Test
public void testTupleTwoOrderIdx() {
final ExecutionEnvironment env = ExecutionEnvironment
.getExecutionEnvironment();
DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env
.fromCollection(emptyTupleData, tupleTypeInfo);
// should work
try {
tupleDs.writeAsText("/tmp/willNotHappen")
.sortLocalOutput(0, Order.ASCENDING)
.sortLocalOutput(3, Order.DESCENDING);
} catch (Exception e) {
Assert.fail();
}
}
代码示例来源:origin: apache/flink
@Test
public void testPojoTwoOrder() {
final ExecutionEnvironment env = ExecutionEnvironment
.getExecutionEnvironment();
DataSet<CustomType> pojoDs = env
.fromCollection(pojoData);
// should work
try {
pojoDs.writeAsText("/tmp/willNotHappen")
.sortLocalOutput("myLong", Order.ASCENDING)
.sortLocalOutput("myString", Order.DESCENDING);
} catch (Exception e) {
Assert.fail();
}
}
代码示例来源:origin: apache/flink
public void testTupleSingleOrderExpFull() {
final ExecutionEnvironment env = ExecutionEnvironment
.getExecutionEnvironment();
DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env
.fromCollection(emptyTupleData, tupleTypeInfo);
// should not work
tupleDs.writeAsText("/tmp/willNotHappen")
.sortLocalOutput("*", Order.ANY);
}
代码示例来源:origin: apache/flink
@Test
public void testTupleTwoOrderMixed() {
final ExecutionEnvironment env = ExecutionEnvironment
.getExecutionEnvironment();
DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env
.fromCollection(emptyTupleData, tupleTypeInfo);
// should work
try {
tupleDs.writeAsText("/tmp/willNotHappen")
.sortLocalOutput(4, Order.ASCENDING)
.sortLocalOutput("f2", Order.DESCENDING);
} catch (Exception e) {
Assert.fail();
}
}
代码示例来源:origin: apache/flink
@Test(expected = InvalidProgramException.class)
public void testFailPojoIdx() {
final ExecutionEnvironment env = ExecutionEnvironment
.getExecutionEnvironment();
DataSet<CustomType> pojoDs = env
.fromCollection(pojoData);
// must not work
pojoDs.writeAsText("/tmp/willNotHappen")
.sortLocalOutput(1, Order.DESCENDING);
}
代码示例来源:origin: apache/flink
@Test(expected = InvalidProgramException.class)
public void testPojoSingleOrderFull() {
final ExecutionEnvironment env = ExecutionEnvironment
.getExecutionEnvironment();
DataSet<CustomType> pojoDs = env
.fromCollection(pojoData);
// must not work
pojoDs.writeAsText("/tmp/willNotHappen")
.sortLocalOutput("*", Order.ASCENDING);
}
代码示例来源:origin: apache/flink
@Test(expected = InvalidProgramException.class)
public void testFailPrimitiveOrder2() {
final ExecutionEnvironment env = ExecutionEnvironment
.getExecutionEnvironment();
DataSet<Long> longDs = env
.generateSequence(0, 2);
// must not work
longDs.writeAsText("/tmp/willNotHappen")
.sortLocalOutput("0", Order.ASCENDING);
}
代码示例来源:origin: apache/flink
@Test(expected = InvalidProgramException.class)
public void testFailPrimitiveOrder3() {
final ExecutionEnvironment env = ExecutionEnvironment
.getExecutionEnvironment();
DataSet<Long> longDs = env
.generateSequence(0, 2);
// must not work
longDs.writeAsText("/tmp/willNotHappen")
.sortLocalOutput("nope", Order.ASCENDING);
}
代码示例来源:origin: apache/flink
@Test(expected = InvalidProgramException.class)
public void testFailPrimitiveOrder1() {
final ExecutionEnvironment env = ExecutionEnvironment
.getExecutionEnvironment();
DataSet<Long> longDs = env
.generateSequence(0, 2);
// must not work
longDs.writeAsText("/tmp/willNotHappen")
.sortLocalOutput(0, Order.ASCENDING);
}
代码示例来源:origin: apache/flink
@Test
public void testTupleSingleOrderIdx() {
final ExecutionEnvironment env = ExecutionEnvironment
.getExecutionEnvironment();
DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env
.fromCollection(emptyTupleData, tupleTypeInfo);
// should work
try {
tupleDs.writeAsText("/tmp/willNotHappen").sortLocalOutput(0, Order.ANY);
} catch (Exception e) {
Assert.fail();
}
}
代码示例来源:origin: apache/flink
@Test
public void testPrimitiveOrder() {
final ExecutionEnvironment env = ExecutionEnvironment
.getExecutionEnvironment();
DataSet<Long> longDs = env
.generateSequence(0, 2);
// should work
try {
longDs.writeAsText("/tmp/willNotHappen")
.sortLocalOutput("*", Order.ASCENDING);
} catch (Exception e) {
Assert.fail();
}
}
代码示例来源:origin: apache/flink
@Test
public void testPojoSingleOrder() {
final ExecutionEnvironment env = ExecutionEnvironment
.getExecutionEnvironment();
DataSet<CustomType> pojoDs = env
.fromCollection(pojoData);
// should work
try {
pojoDs.writeAsText("/tmp/willNotHappen")
.sortLocalOutput("myString", Order.ASCENDING);
} catch (Exception e) {
Assert.fail();
}
}
代码示例来源:origin: apache/flink
@Test
public void testTupleSingleOrderExp() {
final ExecutionEnvironment env = ExecutionEnvironment
.getExecutionEnvironment();
DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env
.fromCollection(emptyTupleData, tupleTypeInfo);
// should work
try {
tupleDs.writeAsText("/tmp/willNotHappen")
.sortLocalOutput("f0", Order.ANY);
} catch (Exception e) {
Assert.fail();
}
}
代码示例来源:origin: apache/flink
@Test
public void testIntSortingParallelism1() throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Integer> ds = CollectionDataSets.getIntegerDataSet(env);
ds.writeAsText(resultPath).sortLocalOutput("*", Order.DESCENDING).setParallelism(1);
env.execute();
String expected = "5\n5\n5\n5\n5\n4\n4\n4\n4\n3\n3\n3\n2\n2\n1\n";
compareResultsByLinesInMemoryWithStrictOrder(expected, resultPath);
}
代码示例来源:origin: apache/flink
@Test
public void testPojoSortingSingleParallelism1() throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<CollectionDataSets.POJO> ds = CollectionDataSets.getMixedPojoDataSet(env);
ds.writeAsText(resultPath).sortLocalOutput("number", Order.ASCENDING).setParallelism(1);
env.execute();
String expected = "1 First (10,100,1000,One) 10100\n" +
"2 First_ (10,105,1000,One) 10200\n" +
"3 First (11,102,3000,One) 10200\n" +
"4 First_ (11,106,1000,One) 10300\n" +
"5 First (11,102,2000,One) 10100\n" +
"6 Second_ (20,200,2000,Two) 10100\n" +
"7 Third (31,301,2000,Three) 10200\n" +
"8 Third_ (30,300,1000,Three) 10100\n";
compareResultsByLinesInMemoryWithStrictOrder(expected, resultPath);
}
内容来源于网络,如有侵权,请联系作者删除!