org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator.map()方法的使用及代码示例

x33g5p2x  于2022-01-30 转载在 其他  
字(8.4k)|赞(0)|评价(0)|浏览(114)

本文整理了Java中org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator.map()方法的一些代码示例,展示了SingleOutputStreamOperator.map()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。SingleOutputStreamOperator.map()方法的具体详情如下:
包路径:org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
类名称:SingleOutputStreamOperator
方法名:map

SingleOutputStreamOperator.map介绍

暂无

代码示例

代码示例来源:origin: apache/flink

@Test
public void testUserProvidedHashing() {
  StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
  List<String> userHashes = Arrays.asList("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb");
  env.addSource(new NoOpSourceFunction(), "src").setUidHash(userHashes.get(0))
      .map(new NoOpMapFunction())
      .filter(new NoOpFilterFunction())
      .keyBy(new NoOpKeySelector())
      .reduce(new NoOpReduceFunction()).name("reduce").setUidHash(userHashes.get(1));
  StreamGraph streamGraph = env.getStreamGraph();
  int idx = 1;
  for (JobVertex jobVertex : streamGraph.getJobGraph().getVertices()) {
    List<JobVertexID> idAlternatives = jobVertex.getIdAlternatives();
    Assert.assertEquals(idAlternatives.get(idAlternatives.size() - 1).toString(), userHashes.get(idx));
    --idx;
  }
}

代码示例来源:origin: apache/flink

/**
 * Runs the following program.
 * <pre>
 *     [ (source)->(filter)] -> [ (map) -> (map) ] -> [ (groupBy/reduce)->(sink) ]
 * </pre>
 */
@Override
public void testProgram(StreamExecutionEnvironment env) {
  assertTrue("Broken test setup", NUM_STRINGS % 40 == 0);
  final long failurePosMin = (long) (0.4 * NUM_STRINGS / PARALLELISM);
  final long failurePosMax = (long) (0.7 * NUM_STRINGS / PARALLELISM);
  final long failurePos = (new Random().nextLong() % (failurePosMax - failurePosMin)) + failurePosMin;
  env.enableCheckpointing(200);
  DataStream<String> stream = env.addSource(new StringGeneratingSourceFunction(NUM_STRINGS));
  stream
      // first vertex, chained to the source
      // this filter throttles the flow until at least one checkpoint
      // is complete, to make sure this program does not run without
      .filter(new StringRichFilterFunction())
          // -------------- seconds vertex - one-to-one connected ----------------
      .map(new StringPrefixCountRichMapFunction())
      .startNewChain()
      .map(new StatefulCounterFunction())
          // -------------- third vertex - reducer and the sink ----------------
      .keyBy("prefix")
      .flatMap(new OnceFailingAggregator(failurePos))
      .addSink(new ValidatingSink());
}

代码示例来源:origin: apache/flink

.setBufferTimeout(-1)
  .name("A")
.map(value -> value)
  .setBufferTimeout(0)
  .name("B")
.map(value -> value)
  .setBufferTimeout(12)
  .name("C")
.map(value -> value)
  .name("D");

代码示例来源:origin: apache/flink

/**
 * If expected values ever change double check that the change is not braking the contract of
 * {@link StreamingRuntimeContext#getOperatorUniqueID()} being stable between job submissions.
 */
@Test
public void testGetOperatorUniqueID() throws Exception {
  StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
  env.fromElements(1, 2, 3)
    .map(new VerifyOperatorIDMapFunction("6c4f323f22da8fb6e34f80c61be7a689")).uid("42")
    .map(new VerifyOperatorIDMapFunction("3e129e83691e7737fbf876b47452acbc")).uid("44");
  env.execute();
}

代码示例来源:origin: apache/flink

.map(new MapFunction<NonSerializable, Integer>() {
  private static final long serialVersionUID = 6906984044674568945L;

代码示例来源:origin: apache/flink

.map(new IdentityMapFunction())
.startNewChain()

代码示例来源:origin: apache/flink

.map(new Mapper(coordinateDir));

代码示例来源:origin: apache/flink

/**
 * Tests that a manual hash at the beginning of a chain is accepted.
 */
@Test
public void testManualHashAssignmentForStartNodeInInChain() throws Exception {
  StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
  env.setParallelism(4);
  env.addSource(new NoOpSourceFunction()).uid("source")
      .map(new NoOpMapFunction())
      .addSink(new NoOpSinkFunction());
  env.getStreamGraph().getJobGraph();
}

代码示例来源:origin: apache/flink

.map(new MapFunction<Long, Long>() {
  @Override
  public Long map(Long value) throws Exception {
.map(new MapFunction<Long, Long>() {
  @Override
  public Long map(Long value) throws Exception {

代码示例来源:origin: apache/flink

@SuppressWarnings("rawtypes")
@Test
public void testSimpleIteration() throws Exception {
  int numRetries = 5;
  int timeoutScale = 1;
  for (int numRetry = 0; numRetry < numRetries; numRetry++) {
    try {
      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      iterated = new boolean[parallelism];
      DataStream<Boolean> source = env.fromCollection(Collections.nCopies(parallelism * 2, false))
          .map(noOpBoolMap).name("ParallelizeMap");
      IterativeStream<Boolean> iteration = source.iterate(3000 * timeoutScale);
      DataStream<Boolean> increment = iteration.flatMap(new IterationHead()).map(noOpBoolMap);
      iteration.map(noOpBoolMap).addSink(new ReceiveCheckNoOpSink());
      iteration.closeWith(increment).addSink(new ReceiveCheckNoOpSink());
      env.execute();
      for (boolean iter : iterated) {
        assertTrue(iter);
      }
      break; // success
    } catch (Throwable t) {
      LOG.info("Run " + (numRetry + 1) + "/" + numRetries + " failed", t);
      if (numRetry >= numRetries - 1) {
        throw t;
      } else {
        timeoutScale *= 2;
      }
    }
  }
}

代码示例来源:origin: apache/flink

.addSource(kafkaSource)
.map(new PartitionValidatingMapper(parallelism, 1))
.map(new BrokerKillingMapper<Integer>(leaderId, failAfterElements))
.addSink(new ValidatingExactlyOnceSink(totalElements)).setParallelism(1);

代码示例来源:origin: apache/flink

/**
 * Tests that a collision on the manual hash throws an Exception.
 */
@Test(expected = IllegalArgumentException.class)
public void testManualHashAssignmentCollisionThrowsException() throws Exception {
  StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
  env.setParallelism(4);
  env.disableOperatorChaining();
  env.addSource(new NoOpSourceFunction()).uid("source")
      .map(new NoOpMapFunction()).uid("source") // Collision
      .addSink(new NoOpSinkFunction());
  // This call is necessary to generate the job graph
  env.getStreamGraph().getJobGraph();
}

代码示例来源:origin: apache/flink

.addSource(kafkaSource)
.map(new PartitionValidatingMapper(parallelism, 1))
.map(new FailingIdentityMapper<Integer>(failAfterElements))
.addSink(new ValidatingExactlyOnceSink(totalElements)).setParallelism(1);

代码示例来源:origin: apache/flink

.addSource(kafkaSource)
.map(new PartitionValidatingMapper(numPartitions, 3))
.map(new FailingIdentityMapper<Integer>(failAfterElements))
.addSink(new ValidatingExactlyOnceSink(totalElements)).setParallelism(1);

代码示例来源:origin: apache/flink

.addSource(kafkaSource)
.map(new PartitionValidatingMapper(numPartitions, 1))
.map(new FailingIdentityMapper<Integer>(failAfterElements))
.addSink(new ValidatingExactlyOnceSink(totalElements)).setParallelism(1);

代码示例来源:origin: apache/flink

@Test
public void testUserProvidedHashingOnChainSupported() {
  StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
  env.addSource(new NoOpSourceFunction(), "src").setUidHash("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa")
      .map(new NoOpMapFunction()).setUidHash("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb")
      .filter(new NoOpFilterFunction()).setUidHash("cccccccccccccccccccccccccccccccc")
      .keyBy(new NoOpKeySelector())
      .reduce(new NoOpReduceFunction()).name("reduce").setUidHash("dddddddddddddddddddddddddddddddd");
  env.getStreamGraph().getJobGraph();
}

代码示例来源:origin: apache/flink

/**
 * Runs the following program.
 * <pre>
 *     [ (source)->(filter) ]-s->[ (map) ] -> [ (map) ] -> [ (groupBy/count)->(sink) ]
 * </pre>
 */
@Override
public void testProgram(StreamExecutionEnvironment env) {
  DataStream<String> stream = env.addSource(new StringGeneratingSourceFunction(NUM_STRINGS));
  stream
      // -------------- first vertex, chained to the source ----------------
      .filter(new StringRichFilterFunction())
      .shuffle()
      // -------------- seconds vertex - the stateful one that also fails ----------------
      .map(new StringPrefixCountRichMapFunction())
      .startNewChain()
      .map(new StatefulCounterFunction())
      // -------------- third vertex - counter and the sink ----------------
      .keyBy("prefix")
      .map(new OnceFailingPrefixCounter(NUM_STRINGS))
      .addSink(new SinkFunction<PrefixCount>() {
        @Override
        public void invoke(PrefixCount value) throws Exception {
          // Do nothing here
        }
      });
}

代码示例来源:origin: apache/flink

.map(new StringPrefixCountRichMapFunction())
.startNewChain()
.map(new StatefulCounterFunction())

代码示例来源:origin: apache/flink

.map(new OnceFailingIdentityMapFunction(NUM_INPUT))
.keyBy(0)
.addSink(new MinEvictingQueueSink());

代码示例来源:origin: apache/flink

}).map(new MapFunction<Tuple2<Event, Long>, Event>() {

相关文章

微信公众号

最新文章

更多