org.apache.flink.api.java.operators.MapOperator.output()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(15.0k)|赞(0)|评价(0)|浏览(80)

本文整理了Java中org.apache.flink.api.java.operators.MapOperator.output()方法的一些代码示例,展示了MapOperator.output()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。MapOperator.output()方法的具体详情如下:
包路径:org.apache.flink.api.java.operators.MapOperator
类名称:MapOperator
方法名:output

MapOperator.output介绍

暂无

代码示例

代码示例来源:origin: apache/flink

private void createPrintSink(PythonOperationInfo info) {
  DataSet<byte[]> parent = sets.getDataSet(info.parentID);
  parent.map(new StringDeserializerMap()).setParallelism(info.parallelism).name("PrintSinkPreStep")
    .output(new PrintingOutputFormat<String>(info.toError)).setParallelism(info.parallelism);
}

代码示例来源:origin: apache/flink

@Test
public void testUnaryOp() {
  try {
    ExecutionEnvironment env = ExecutionEnvironment.createCollectionsEnvironment();
    DataSet<String> bcData = env.fromElements(SUFFIX);
    List<String> result = new ArrayList<String>();
    env.fromElements(TEST_DATA)
        .map(new SuffixAppender()).withBroadcastSet(bcData, BC_VAR_NAME)
        .output(new LocalCollectionOutputFormat<String>(result));
    env.execute();
    assertEquals(TEST_DATA.length, result.size());
    for (String s : result) {
      assertTrue(s.indexOf(SUFFIX) > 0);
    }
  }
  catch (Exception e) {
    e.printStackTrace();
    fail(e.getMessage());
  }
}

代码示例来源:origin: apache/flink

@Test
public void testAccumulator() {
  try {
    final int numElements = 100;
    ExecutionEnvironment env = ExecutionEnvironment.createCollectionsEnvironment();
    env.generateSequence(1, numElements)
      .map(new CountingMapper())
      .output(new DiscardingOutputFormat<Long>());
    JobExecutionResult result = env.execute();
    assertTrue(result.getNetRuntime() >= 0);
    assertEquals(numElements, (int) result.getAccumulatorResult(ACCUMULATOR_NAME));
  }
  catch (Exception e) {
    e.printStackTrace();
    fail(e.getMessage());
  }
}

代码示例来源:origin: apache/flink

public void executeTask(MapFunction<Integer, Integer> mapper) throws Exception {
  ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  env
      .createInput(new InfiniteIntegerInputFormat(false))
      .map(mapper)
      .output(new DiscardingOutputFormat<Integer>());
  env.setParallelism(PARALLELISM);
  runAndCancelJob(env.createProgramPlan(), 5 * 1000, 10 * 1000);
}

代码示例来源:origin: apache/flink

@Test
public void testNoBreakerForIndependentVariable() {
  try {
    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    
    DataSet<String> source1 = env.fromElements("test");
    DataSet<String> source2 = env.fromElements("test");
    
    source1.map(new IdentityMapper<String>()).withBroadcastSet(source2, "some name")
        .output(new DiscardingOutputFormat<String>());
    
    Plan p = env.createProgramPlan();
    OptimizedPlan op = compileNoStats(p);
    
    SinkPlanNode sink = op.getDataSinks().iterator().next();
    SingleInputPlanNode mapper = (SingleInputPlanNode) sink.getInput().getSource();
    
    assertEquals(TempMode.NONE, mapper.getInput().getTempMode());
    assertEquals(TempMode.NONE, mapper.getBroadcastInputs().get(0).getTempMode());
    
    assertEquals(DataExchangeMode.PIPELINED, mapper.getInput().getDataExchangeMode());
    assertEquals(DataExchangeMode.PIPELINED, mapper.getBroadcastInputs().get(0).getDataExchangeMode());
  }
  catch (Exception e) {
    e.printStackTrace();
    fail(e.getMessage());
  }
}

代码示例来源:origin: apache/flink

@Test
public void testFaultyMergeAccumulator() throws Exception {
  ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  env.getConfig().disableSysoutLogging();
  // Test Exception forwarding with faulty Accumulator implementation
  env.generateSequence(0, 10000)
    .map(new FaultyMergeAccumulatorUsingMapper())
    .output(new DiscardingOutputFormat<>());
  assertAccumulatorsShouldFail(env.execute());
}

代码示例来源:origin: apache/flink

@Test
public void testBranchesOnlyInBCVariables1() {
  try{
    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(100);
    DataSet<Long> input = env.generateSequence(1, 10);
    DataSet<Long> bc_input = env.generateSequence(1, 10);
    
    input
      .map(new IdentityMapper<Long>()).withBroadcastSet(bc_input, "name1")
      .map(new IdentityMapper<Long>()).withBroadcastSet(bc_input, "name2")
      .output(new DiscardingOutputFormat<Long>());
    
    Plan plan = env.createProgramPlan();
    compileNoStats(plan);
  }
  catch(Exception e){
    e.printStackTrace();
    fail(e.getMessage());
  }
}

代码示例来源:origin: apache/flink

@Test
public void testFaultyAccumulator() throws Exception {
  ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  env.getConfig().disableSysoutLogging();
  // Test Exception forwarding with faulty Accumulator implementation
  env.generateSequence(0, 10000)
    .map(new FaultyAccumulatorUsingMapper())
    .output(new DiscardingOutputFormat<>());
  assertAccumulatorsShouldFail(env.execute());
}

代码示例来源:origin: apache/flink

@Test
public void testBranchAfterIteration() {
  ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  env.setParallelism(DEFAULT_PARALLELISM);
  DataSet<Long> sourceA = env.generateSequence(0,1);
  IterativeDataSet<Long> loopHead = sourceA.iterate(10);
  DataSet<Long> loopTail = loopHead.map(new IdentityMapper<Long>()).name("Mapper");
  DataSet<Long> loopRes = loopHead.closeWith(loopTail);
  loopRes.output(new DiscardingOutputFormat<Long>());
  loopRes.map(new IdentityMapper<Long>())
      .output(new DiscardingOutputFormat<Long>());
  Plan plan = env.createProgramPlan();
  try {
    compileNoStats(plan);
  }
  catch (Exception e) {
    e.printStackTrace();
    Assert.fail(e.getMessage());
  }
}

代码示例来源:origin: apache/flink

@Test
public void testUnaryFunctionReadFieldsAnnotation() {
  ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  @SuppressWarnings("unchecked")
  DataSet<Tuple3<Long, Long, Long>> input = env.fromElements(new Tuple3<Long, Long, Long>(3L, 2L, 1L));
  input.map(new ReadSetMapper<Tuple3<Long, Long, Long>>()).output(new DiscardingOutputFormat<Tuple3<Long, Long, Long>>());
  Plan plan = env.createProgramPlan();
  GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next();
  MapOperatorBase<?, ?, ?> mapper = (MapOperatorBase<?, ?, ?>) sink.getInput();
  SingleInputSemanticProperties semantics = mapper.getSemanticProperties();
  FieldSet read = semantics.getReadFields(0);
  assertNotNull(read);
  assertEquals(2, read.size());
  assertTrue(read.contains(0));
  assertTrue(read.contains(2));
}

代码示例来源:origin: apache/flink

@Test
public void testUnaryFunctionInPlaceForwardedAnnotation() {
  ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  @SuppressWarnings("unchecked")
  DataSet<Tuple3<Long, String, Integer>> input = env.fromElements(new Tuple3<Long, String, Integer>(3L, "test", 42));
  input.map(new IndividualForwardedMapper<Long, String, Integer>()).output(new DiscardingOutputFormat<Tuple3<Long, String, Integer>>());
  Plan plan = env.createProgramPlan();
  GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next();
  MapOperatorBase<?, ?, ?> mapper = (MapOperatorBase<?, ?, ?>) sink.getInput();
  SingleInputSemanticProperties semantics = mapper.getSemanticProperties();
  FieldSet fw1 = semantics.getForwardingTargetFields(0, 0);
  FieldSet fw2 = semantics.getForwardingTargetFields(0, 2);
  assertNotNull(fw1);
  assertNotNull(fw2);
  assertTrue(fw1.contains(0));
  assertTrue(fw2.contains(2));
}

代码示例来源:origin: apache/flink

@Test
public void testUnaryFunctionAllForwardedExceptAnnotation() {
  ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  @SuppressWarnings("unchecked")
  DataSet<Tuple3<Long, Long, Long>> input = env.fromElements(new Tuple3<Long, Long, Long>(3L, 2L, 1L));
  input.map(new AllForwardedExceptMapper<Tuple3<Long, Long, Long>>()).output(new DiscardingOutputFormat<Tuple3<Long, Long, Long>>());
  Plan plan = env.createProgramPlan();
  GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next();
  MapOperatorBase<?, ?, ?> mapper = (MapOperatorBase<?, ?, ?>) sink.getInput();
  SingleInputSemanticProperties semantics = mapper.getSemanticProperties();
  FieldSet fw1 = semantics.getForwardingTargetFields(0, 0);
  FieldSet fw2 = semantics.getForwardingTargetFields(0, 2);
  assertNotNull(fw1);
  assertNotNull(fw2);
  assertTrue(fw1.contains(0));
  assertTrue(fw2.contains(2));
}

代码示例来源:origin: apache/flink

@Override
protected void testProgram() throws Exception {
  ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  if (!isCollectionExecution()) {
    Assert.assertTrue(env.getParallelism() > 1);
  }
  env.generateSequence(1, 1000)
    .partitionCustom(new AllZeroPartitioner(), new IdKeySelector<Long>())
    .map(new FailExceptInPartitionZeroMapper())
    .output(new DiscardingOutputFormat<Long>());
  env.execute();
}

代码示例来源:origin: apache/flink

@Test
public void testUnaryFunctionForwardedInLine2() {
  ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  @SuppressWarnings("unchecked")
  DataSet<Tuple3<Long, Long, Long>> input = env.fromElements(new Tuple3<Long, Long, Long>(3L, 2L, 1L));
  input.map(new ReadSetMapper<Tuple3<Long, Long, Long>>()).withForwardedFields("0->1; 2")
      .output(new DiscardingOutputFormat<Tuple3<Long, Long, Long>>());
  Plan plan = env.createProgramPlan();
  GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next();
  MapOperatorBase<?, ?, ?> mapper = (MapOperatorBase<?, ?, ?>) sink.getInput();
  SingleInputSemanticProperties semantics = mapper.getSemanticProperties();
  FieldSet fw1 = semantics.getForwardingTargetFields(0, 0);
  FieldSet fw2 = semantics.getForwardingTargetFields(0, 2);
  assertNotNull(fw1);
  assertNotNull(fw2);
  assertTrue(fw1.contains(1));
  assertTrue(fw2.contains(2));
}

代码示例来源:origin: apache/flink

@Test
public void testUnaryFunctionForwardedInLine3() {
  ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  @SuppressWarnings("unchecked")
  DataSet<Tuple3<Long, Long, Long>> input = env.fromElements(new Tuple3<Long, Long, Long>(3L, 2L, 1L));
  input.map(new ReadSetMapper<Tuple3<Long, Long, Long>>()).withForwardedFields("0->1; 2")
      .output(new DiscardingOutputFormat<Tuple3<Long, Long, Long>>());
  Plan plan = env.createProgramPlan();
  GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next();
  MapOperatorBase<?, ?, ?> mapper = (MapOperatorBase<?, ?, ?>) sink.getInput();
  SingleInputSemanticProperties semantics = mapper.getSemanticProperties();
  FieldSet fw1 = semantics.getForwardingTargetFields(0, 0);
  FieldSet fw2 = semantics.getForwardingTargetFields(0, 2);
  assertNotNull(fw1);
  assertNotNull(fw2);
  assertTrue(fw1.contains(1));
  assertTrue(fw2.contains(2));
}

代码示例来源:origin: apache/flink

@Test
public void testUnaryFunctionForwardedInLine1() {
  ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  @SuppressWarnings("unchecked")
  DataSet<Tuple3<Long, Long, Long>> input = env.fromElements(new Tuple3<Long, Long, Long>(3L, 2L, 1L));
  input.map(new NoAnnotationMapper<Tuple3<Long, Long, Long>>()).withForwardedFields("0->1; 2")
      .output(new DiscardingOutputFormat<Tuple3<Long, Long, Long>>());
  Plan plan = env.createProgramPlan();
  GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next();
  MapOperatorBase<?, ?, ?> mapper = (MapOperatorBase<?, ?, ?>) sink.getInput();
  SingleInputSemanticProperties semantics = mapper.getSemanticProperties();
  FieldSet fw1 = semantics.getForwardingTargetFields(0, 0);
  FieldSet fw2 = semantics.getForwardingTargetFields(0, 2);
  assertNotNull(fw1);
  assertNotNull(fw2);
  assertTrue(fw1.contains(1));
  assertTrue(fw2.contains(2));
}

代码示例来源:origin: apache/flink

@Test
public void testInvalidTypeAccumulator() throws Exception {
  ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  env.getConfig().disableSysoutLogging();
  // Test Exception forwarding with faulty Accumulator implementation
  env.generateSequence(0, 10000)
    .map(new IncompatibleAccumulatorTypesMapper())
    .map(new IncompatibleAccumulatorTypesMapper2())
    .output(new DiscardingOutputFormat<>());
  try {
    env.execute();
    fail("Should have failed.");
  } catch (JobExecutionException e) {
    assertTrue("Root cause should be:",
        e.getCause() instanceof Exception);
    assertTrue("Root cause should be:",
        e.getCause().getCause() instanceof UnsupportedOperationException);
  }
}

代码示例来源:origin: apache/flink

@Test
public void testUnaryFunctionMovingForwardedAnnotation() {
  ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  @SuppressWarnings("unchecked")
  DataSet<Tuple3<Long, Long, Long>> input = env.fromElements(new Tuple3<Long, Long, Long>(3L, 2L, 1L));
  input.map(new ShufflingMapper<Long>()).output(new DiscardingOutputFormat<Tuple3<Long, Long, Long>>());
  Plan plan = env.createProgramPlan();
  GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next();
  MapOperatorBase<?, ?, ?> mapper = (MapOperatorBase<?, ?, ?>) sink.getInput();
  SingleInputSemanticProperties semantics = mapper.getSemanticProperties();
  FieldSet fw1 = semantics.getForwardingTargetFields(0, 0);
  FieldSet fw2 = semantics.getForwardingTargetFields(0, 1);
  FieldSet fw3 = semantics.getForwardingTargetFields(0, 2);
  assertNotNull(fw1);
  assertNotNull(fw2);
  assertNotNull(fw3);
  assertTrue(fw1.contains(2));
  assertTrue(fw2.contains(0));
  assertTrue(fw3.contains(1));
}

代码示例来源:origin: apache/flink

@Test
public void testUnaryFunctionWildcardForwardedAnnotation() {
  ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  @SuppressWarnings("unchecked")
  DataSet<Tuple3<Long, String, Integer>> input = env.fromElements(new Tuple3<Long, String, Integer>(3L, "test", 42));
  input.map(new WildcardForwardedMapper<Tuple3<Long, String, Integer>>()).output(new DiscardingOutputFormat<Tuple3<Long, String, Integer>>());
  Plan plan = env.createProgramPlan();
  GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next();
  MapOperatorBase<?, ?, ?> mapper = (MapOperatorBase<?, ?, ?>) sink.getInput();
  SingleInputSemanticProperties semantics = mapper.getSemanticProperties();
  FieldSet fw1 = semantics.getForwardingTargetFields(0, 0);
  FieldSet fw2 = semantics.getForwardingTargetFields(0, 1);
  FieldSet fw3 = semantics.getForwardingTargetFields(0, 2);
  assertNotNull(fw1);
  assertNotNull(fw2);
  assertNotNull(fw3);
  assertTrue(fw1.contains(0));
  assertTrue(fw2.contains(1));
  assertTrue(fw3.contains(2));
}

代码示例来源:origin: apache/flink

@Test
public void testFunctionSkipCodeAnalysisAnnotationPrecedence() {
  ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  env.getConfig().setCodeAnalysisMode(CodeAnalysisMode.OPTIMIZE);
  @SuppressWarnings("unchecked")
  DataSet<Tuple3<Long, String, Integer>> input = env.fromElements(Tuple3.of(3L, "test", 42));
  input
      .map(new WildcardForwardedMapperWithSkipAnnotation<Tuple3<Long, String, Integer>>())
      .output(new DiscardingOutputFormat<Tuple3<Long, String, Integer>>());
  Plan plan = env.createProgramPlan();
  GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next();
  MapOperatorBase<?, ?, ?> mapper = (MapOperatorBase<?, ?, ?>) sink.getInput();
  SingleInputSemanticProperties semantics = mapper.getSemanticProperties();
  FieldSet fw1 = semantics.getForwardingTargetFields(0, 0);
  FieldSet fw2 = semantics.getForwardingTargetFields(0, 1);
  FieldSet fw3 = semantics.getForwardingTargetFields(0, 2);
  assertNotNull(fw1);
  assertNotNull(fw2);
  assertNotNull(fw3);
  assertFalse(fw1.contains(0));
  assertFalse(fw2.contains(1));
  assertFalse(fw3.contains(2));
}

相关文章