本文整理了Java中org.apache.flink.api.java.operators.MapOperator.output()
方法的一些代码示例,展示了MapOperator.output()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。MapOperator.output()
方法的具体详情如下:
包路径:org.apache.flink.api.java.operators.MapOperator
类名称:MapOperator
方法名:output
暂无
代码示例来源:origin: apache/flink
private void createPrintSink(PythonOperationInfo info) {
DataSet<byte[]> parent = sets.getDataSet(info.parentID);
parent.map(new StringDeserializerMap()).setParallelism(info.parallelism).name("PrintSinkPreStep")
.output(new PrintingOutputFormat<String>(info.toError)).setParallelism(info.parallelism);
}
代码示例来源:origin: apache/flink
@Test
public void testUnaryOp() {
try {
ExecutionEnvironment env = ExecutionEnvironment.createCollectionsEnvironment();
DataSet<String> bcData = env.fromElements(SUFFIX);
List<String> result = new ArrayList<String>();
env.fromElements(TEST_DATA)
.map(new SuffixAppender()).withBroadcastSet(bcData, BC_VAR_NAME)
.output(new LocalCollectionOutputFormat<String>(result));
env.execute();
assertEquals(TEST_DATA.length, result.size());
for (String s : result) {
assertTrue(s.indexOf(SUFFIX) > 0);
}
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
代码示例来源:origin: apache/flink
@Test
public void testAccumulator() {
try {
final int numElements = 100;
ExecutionEnvironment env = ExecutionEnvironment.createCollectionsEnvironment();
env.generateSequence(1, numElements)
.map(new CountingMapper())
.output(new DiscardingOutputFormat<Long>());
JobExecutionResult result = env.execute();
assertTrue(result.getNetRuntime() >= 0);
assertEquals(numElements, (int) result.getAccumulatorResult(ACCUMULATOR_NAME));
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
代码示例来源:origin: apache/flink
public void executeTask(MapFunction<Integer, Integer> mapper) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env
.createInput(new InfiniteIntegerInputFormat(false))
.map(mapper)
.output(new DiscardingOutputFormat<Integer>());
env.setParallelism(PARALLELISM);
runAndCancelJob(env.createProgramPlan(), 5 * 1000, 10 * 1000);
}
代码示例来源:origin: apache/flink
@Test
public void testNoBreakerForIndependentVariable() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<String> source1 = env.fromElements("test");
DataSet<String> source2 = env.fromElements("test");
source1.map(new IdentityMapper<String>()).withBroadcastSet(source2, "some name")
.output(new DiscardingOutputFormat<String>());
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
SinkPlanNode sink = op.getDataSinks().iterator().next();
SingleInputPlanNode mapper = (SingleInputPlanNode) sink.getInput().getSource();
assertEquals(TempMode.NONE, mapper.getInput().getTempMode());
assertEquals(TempMode.NONE, mapper.getBroadcastInputs().get(0).getTempMode());
assertEquals(DataExchangeMode.PIPELINED, mapper.getInput().getDataExchangeMode());
assertEquals(DataExchangeMode.PIPELINED, mapper.getBroadcastInputs().get(0).getDataExchangeMode());
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
代码示例来源:origin: apache/flink
@Test
public void testFaultyMergeAccumulator() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().disableSysoutLogging();
// Test Exception forwarding with faulty Accumulator implementation
env.generateSequence(0, 10000)
.map(new FaultyMergeAccumulatorUsingMapper())
.output(new DiscardingOutputFormat<>());
assertAccumulatorsShouldFail(env.execute());
}
代码示例来源:origin: apache/flink
@Test
public void testBranchesOnlyInBCVariables1() {
try{
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(100);
DataSet<Long> input = env.generateSequence(1, 10);
DataSet<Long> bc_input = env.generateSequence(1, 10);
input
.map(new IdentityMapper<Long>()).withBroadcastSet(bc_input, "name1")
.map(new IdentityMapper<Long>()).withBroadcastSet(bc_input, "name2")
.output(new DiscardingOutputFormat<Long>());
Plan plan = env.createProgramPlan();
compileNoStats(plan);
}
catch(Exception e){
e.printStackTrace();
fail(e.getMessage());
}
}
代码示例来源:origin: apache/flink
@Test
public void testFaultyAccumulator() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().disableSysoutLogging();
// Test Exception forwarding with faulty Accumulator implementation
env.generateSequence(0, 10000)
.map(new FaultyAccumulatorUsingMapper())
.output(new DiscardingOutputFormat<>());
assertAccumulatorsShouldFail(env.execute());
}
代码示例来源:origin: apache/flink
@Test
public void testBranchAfterIteration() {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(DEFAULT_PARALLELISM);
DataSet<Long> sourceA = env.generateSequence(0,1);
IterativeDataSet<Long> loopHead = sourceA.iterate(10);
DataSet<Long> loopTail = loopHead.map(new IdentityMapper<Long>()).name("Mapper");
DataSet<Long> loopRes = loopHead.closeWith(loopTail);
loopRes.output(new DiscardingOutputFormat<Long>());
loopRes.map(new IdentityMapper<Long>())
.output(new DiscardingOutputFormat<Long>());
Plan plan = env.createProgramPlan();
try {
compileNoStats(plan);
}
catch (Exception e) {
e.printStackTrace();
Assert.fail(e.getMessage());
}
}
代码示例来源:origin: apache/flink
@Test
public void testUnaryFunctionReadFieldsAnnotation() {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
@SuppressWarnings("unchecked")
DataSet<Tuple3<Long, Long, Long>> input = env.fromElements(new Tuple3<Long, Long, Long>(3L, 2L, 1L));
input.map(new ReadSetMapper<Tuple3<Long, Long, Long>>()).output(new DiscardingOutputFormat<Tuple3<Long, Long, Long>>());
Plan plan = env.createProgramPlan();
GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next();
MapOperatorBase<?, ?, ?> mapper = (MapOperatorBase<?, ?, ?>) sink.getInput();
SingleInputSemanticProperties semantics = mapper.getSemanticProperties();
FieldSet read = semantics.getReadFields(0);
assertNotNull(read);
assertEquals(2, read.size());
assertTrue(read.contains(0));
assertTrue(read.contains(2));
}
代码示例来源:origin: apache/flink
@Test
public void testUnaryFunctionInPlaceForwardedAnnotation() {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
@SuppressWarnings("unchecked")
DataSet<Tuple3<Long, String, Integer>> input = env.fromElements(new Tuple3<Long, String, Integer>(3L, "test", 42));
input.map(new IndividualForwardedMapper<Long, String, Integer>()).output(new DiscardingOutputFormat<Tuple3<Long, String, Integer>>());
Plan plan = env.createProgramPlan();
GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next();
MapOperatorBase<?, ?, ?> mapper = (MapOperatorBase<?, ?, ?>) sink.getInput();
SingleInputSemanticProperties semantics = mapper.getSemanticProperties();
FieldSet fw1 = semantics.getForwardingTargetFields(0, 0);
FieldSet fw2 = semantics.getForwardingTargetFields(0, 2);
assertNotNull(fw1);
assertNotNull(fw2);
assertTrue(fw1.contains(0));
assertTrue(fw2.contains(2));
}
代码示例来源:origin: apache/flink
@Test
public void testUnaryFunctionAllForwardedExceptAnnotation() {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
@SuppressWarnings("unchecked")
DataSet<Tuple3<Long, Long, Long>> input = env.fromElements(new Tuple3<Long, Long, Long>(3L, 2L, 1L));
input.map(new AllForwardedExceptMapper<Tuple3<Long, Long, Long>>()).output(new DiscardingOutputFormat<Tuple3<Long, Long, Long>>());
Plan plan = env.createProgramPlan();
GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next();
MapOperatorBase<?, ?, ?> mapper = (MapOperatorBase<?, ?, ?>) sink.getInput();
SingleInputSemanticProperties semantics = mapper.getSemanticProperties();
FieldSet fw1 = semantics.getForwardingTargetFields(0, 0);
FieldSet fw2 = semantics.getForwardingTargetFields(0, 2);
assertNotNull(fw1);
assertNotNull(fw2);
assertTrue(fw1.contains(0));
assertTrue(fw2.contains(2));
}
代码示例来源:origin: apache/flink
@Override
protected void testProgram() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
if (!isCollectionExecution()) {
Assert.assertTrue(env.getParallelism() > 1);
}
env.generateSequence(1, 1000)
.partitionCustom(new AllZeroPartitioner(), new IdKeySelector<Long>())
.map(new FailExceptInPartitionZeroMapper())
.output(new DiscardingOutputFormat<Long>());
env.execute();
}
代码示例来源:origin: apache/flink
@Test
public void testUnaryFunctionForwardedInLine2() {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
@SuppressWarnings("unchecked")
DataSet<Tuple3<Long, Long, Long>> input = env.fromElements(new Tuple3<Long, Long, Long>(3L, 2L, 1L));
input.map(new ReadSetMapper<Tuple3<Long, Long, Long>>()).withForwardedFields("0->1; 2")
.output(new DiscardingOutputFormat<Tuple3<Long, Long, Long>>());
Plan plan = env.createProgramPlan();
GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next();
MapOperatorBase<?, ?, ?> mapper = (MapOperatorBase<?, ?, ?>) sink.getInput();
SingleInputSemanticProperties semantics = mapper.getSemanticProperties();
FieldSet fw1 = semantics.getForwardingTargetFields(0, 0);
FieldSet fw2 = semantics.getForwardingTargetFields(0, 2);
assertNotNull(fw1);
assertNotNull(fw2);
assertTrue(fw1.contains(1));
assertTrue(fw2.contains(2));
}
代码示例来源:origin: apache/flink
@Test
public void testUnaryFunctionForwardedInLine3() {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
@SuppressWarnings("unchecked")
DataSet<Tuple3<Long, Long, Long>> input = env.fromElements(new Tuple3<Long, Long, Long>(3L, 2L, 1L));
input.map(new ReadSetMapper<Tuple3<Long, Long, Long>>()).withForwardedFields("0->1; 2")
.output(new DiscardingOutputFormat<Tuple3<Long, Long, Long>>());
Plan plan = env.createProgramPlan();
GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next();
MapOperatorBase<?, ?, ?> mapper = (MapOperatorBase<?, ?, ?>) sink.getInput();
SingleInputSemanticProperties semantics = mapper.getSemanticProperties();
FieldSet fw1 = semantics.getForwardingTargetFields(0, 0);
FieldSet fw2 = semantics.getForwardingTargetFields(0, 2);
assertNotNull(fw1);
assertNotNull(fw2);
assertTrue(fw1.contains(1));
assertTrue(fw2.contains(2));
}
代码示例来源:origin: apache/flink
@Test
public void testUnaryFunctionForwardedInLine1() {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
@SuppressWarnings("unchecked")
DataSet<Tuple3<Long, Long, Long>> input = env.fromElements(new Tuple3<Long, Long, Long>(3L, 2L, 1L));
input.map(new NoAnnotationMapper<Tuple3<Long, Long, Long>>()).withForwardedFields("0->1; 2")
.output(new DiscardingOutputFormat<Tuple3<Long, Long, Long>>());
Plan plan = env.createProgramPlan();
GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next();
MapOperatorBase<?, ?, ?> mapper = (MapOperatorBase<?, ?, ?>) sink.getInput();
SingleInputSemanticProperties semantics = mapper.getSemanticProperties();
FieldSet fw1 = semantics.getForwardingTargetFields(0, 0);
FieldSet fw2 = semantics.getForwardingTargetFields(0, 2);
assertNotNull(fw1);
assertNotNull(fw2);
assertTrue(fw1.contains(1));
assertTrue(fw2.contains(2));
}
代码示例来源:origin: apache/flink
@Test
public void testInvalidTypeAccumulator() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().disableSysoutLogging();
// Test Exception forwarding with faulty Accumulator implementation
env.generateSequence(0, 10000)
.map(new IncompatibleAccumulatorTypesMapper())
.map(new IncompatibleAccumulatorTypesMapper2())
.output(new DiscardingOutputFormat<>());
try {
env.execute();
fail("Should have failed.");
} catch (JobExecutionException e) {
assertTrue("Root cause should be:",
e.getCause() instanceof Exception);
assertTrue("Root cause should be:",
e.getCause().getCause() instanceof UnsupportedOperationException);
}
}
代码示例来源:origin: apache/flink
@Test
public void testUnaryFunctionMovingForwardedAnnotation() {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
@SuppressWarnings("unchecked")
DataSet<Tuple3<Long, Long, Long>> input = env.fromElements(new Tuple3<Long, Long, Long>(3L, 2L, 1L));
input.map(new ShufflingMapper<Long>()).output(new DiscardingOutputFormat<Tuple3<Long, Long, Long>>());
Plan plan = env.createProgramPlan();
GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next();
MapOperatorBase<?, ?, ?> mapper = (MapOperatorBase<?, ?, ?>) sink.getInput();
SingleInputSemanticProperties semantics = mapper.getSemanticProperties();
FieldSet fw1 = semantics.getForwardingTargetFields(0, 0);
FieldSet fw2 = semantics.getForwardingTargetFields(0, 1);
FieldSet fw3 = semantics.getForwardingTargetFields(0, 2);
assertNotNull(fw1);
assertNotNull(fw2);
assertNotNull(fw3);
assertTrue(fw1.contains(2));
assertTrue(fw2.contains(0));
assertTrue(fw3.contains(1));
}
代码示例来源:origin: apache/flink
@Test
public void testUnaryFunctionWildcardForwardedAnnotation() {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
@SuppressWarnings("unchecked")
DataSet<Tuple3<Long, String, Integer>> input = env.fromElements(new Tuple3<Long, String, Integer>(3L, "test", 42));
input.map(new WildcardForwardedMapper<Tuple3<Long, String, Integer>>()).output(new DiscardingOutputFormat<Tuple3<Long, String, Integer>>());
Plan plan = env.createProgramPlan();
GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next();
MapOperatorBase<?, ?, ?> mapper = (MapOperatorBase<?, ?, ?>) sink.getInput();
SingleInputSemanticProperties semantics = mapper.getSemanticProperties();
FieldSet fw1 = semantics.getForwardingTargetFields(0, 0);
FieldSet fw2 = semantics.getForwardingTargetFields(0, 1);
FieldSet fw3 = semantics.getForwardingTargetFields(0, 2);
assertNotNull(fw1);
assertNotNull(fw2);
assertNotNull(fw3);
assertTrue(fw1.contains(0));
assertTrue(fw2.contains(1));
assertTrue(fw3.contains(2));
}
代码示例来源:origin: apache/flink
@Test
public void testFunctionSkipCodeAnalysisAnnotationPrecedence() {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setCodeAnalysisMode(CodeAnalysisMode.OPTIMIZE);
@SuppressWarnings("unchecked")
DataSet<Tuple3<Long, String, Integer>> input = env.fromElements(Tuple3.of(3L, "test", 42));
input
.map(new WildcardForwardedMapperWithSkipAnnotation<Tuple3<Long, String, Integer>>())
.output(new DiscardingOutputFormat<Tuple3<Long, String, Integer>>());
Plan plan = env.createProgramPlan();
GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next();
MapOperatorBase<?, ?, ?> mapper = (MapOperatorBase<?, ?, ?>) sink.getInput();
SingleInputSemanticProperties semantics = mapper.getSemanticProperties();
FieldSet fw1 = semantics.getForwardingTargetFields(0, 0);
FieldSet fw2 = semantics.getForwardingTargetFields(0, 1);
FieldSet fw3 = semantics.getForwardingTargetFields(0, 2);
assertNotNull(fw1);
assertNotNull(fw2);
assertNotNull(fw3);
assertFalse(fw1.contains(0));
assertFalse(fw2.contains(1));
assertFalse(fw3.contains(2));
}
内容来源于网络,如有侵权,请联系作者删除!