本文整理了Java中org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator.map()
方法的一些代码示例,展示了SingleOutputStreamOperator.map()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。SingleOutputStreamOperator.map()
方法的具体详情如下:
包路径:org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
类名称:SingleOutputStreamOperator
方法名:map
暂无
代码示例来源:origin: apache/flink
@Test
public void testUserProvidedHashing() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
List<String> userHashes = Arrays.asList("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb");
env.addSource(new NoOpSourceFunction(), "src").setUidHash(userHashes.get(0))
.map(new NoOpMapFunction())
.filter(new NoOpFilterFunction())
.keyBy(new NoOpKeySelector())
.reduce(new NoOpReduceFunction()).name("reduce").setUidHash(userHashes.get(1));
StreamGraph streamGraph = env.getStreamGraph();
int idx = 1;
for (JobVertex jobVertex : streamGraph.getJobGraph().getVertices()) {
List<JobVertexID> idAlternatives = jobVertex.getIdAlternatives();
Assert.assertEquals(idAlternatives.get(idAlternatives.size() - 1).toString(), userHashes.get(idx));
--idx;
}
}
代码示例来源:origin: apache/flink
/**
* Runs the following program.
* <pre>
* [ (source)->(filter)] -> [ (map) -> (map) ] -> [ (groupBy/reduce)->(sink) ]
* </pre>
*/
@Override
public void testProgram(StreamExecutionEnvironment env) {
assertTrue("Broken test setup", NUM_STRINGS % 40 == 0);
final long failurePosMin = (long) (0.4 * NUM_STRINGS / PARALLELISM);
final long failurePosMax = (long) (0.7 * NUM_STRINGS / PARALLELISM);
final long failurePos = (new Random().nextLong() % (failurePosMax - failurePosMin)) + failurePosMin;
env.enableCheckpointing(200);
DataStream<String> stream = env.addSource(new StringGeneratingSourceFunction(NUM_STRINGS));
stream
// first vertex, chained to the source
// this filter throttles the flow until at least one checkpoint
// is complete, to make sure this program does not run without
.filter(new StringRichFilterFunction())
// -------------- seconds vertex - one-to-one connected ----------------
.map(new StringPrefixCountRichMapFunction())
.startNewChain()
.map(new StatefulCounterFunction())
// -------------- third vertex - reducer and the sink ----------------
.keyBy("prefix")
.flatMap(new OnceFailingAggregator(failurePos))
.addSink(new ValidatingSink());
}
代码示例来源:origin: apache/flink
.setBufferTimeout(-1)
.name("A")
.map(value -> value)
.setBufferTimeout(0)
.name("B")
.map(value -> value)
.setBufferTimeout(12)
.name("C")
.map(value -> value)
.name("D");
代码示例来源:origin: apache/flink
/**
* If expected values ever change double check that the change is not braking the contract of
* {@link StreamingRuntimeContext#getOperatorUniqueID()} being stable between job submissions.
*/
@Test
public void testGetOperatorUniqueID() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
env.fromElements(1, 2, 3)
.map(new VerifyOperatorIDMapFunction("6c4f323f22da8fb6e34f80c61be7a689")).uid("42")
.map(new VerifyOperatorIDMapFunction("3e129e83691e7737fbf876b47452acbc")).uid("44");
env.execute();
}
代码示例来源:origin: apache/flink
.map(new MapFunction<NonSerializable, Integer>() {
private static final long serialVersionUID = 6906984044674568945L;
代码示例来源:origin: apache/flink
.map(new IdentityMapFunction())
.startNewChain()
代码示例来源:origin: apache/flink
.map(new Mapper(coordinateDir));
代码示例来源:origin: apache/flink
/**
* Tests that a manual hash at the beginning of a chain is accepted.
*/
@Test
public void testManualHashAssignmentForStartNodeInInChain() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
env.setParallelism(4);
env.addSource(new NoOpSourceFunction()).uid("source")
.map(new NoOpMapFunction())
.addSink(new NoOpSinkFunction());
env.getStreamGraph().getJobGraph();
}
代码示例来源:origin: apache/flink
.map(new MapFunction<Long, Long>() {
@Override
public Long map(Long value) throws Exception {
.map(new MapFunction<Long, Long>() {
@Override
public Long map(Long value) throws Exception {
代码示例来源:origin: apache/flink
@SuppressWarnings("rawtypes")
@Test
public void testSimpleIteration() throws Exception {
int numRetries = 5;
int timeoutScale = 1;
for (int numRetry = 0; numRetry < numRetries; numRetry++) {
try {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
iterated = new boolean[parallelism];
DataStream<Boolean> source = env.fromCollection(Collections.nCopies(parallelism * 2, false))
.map(noOpBoolMap).name("ParallelizeMap");
IterativeStream<Boolean> iteration = source.iterate(3000 * timeoutScale);
DataStream<Boolean> increment = iteration.flatMap(new IterationHead()).map(noOpBoolMap);
iteration.map(noOpBoolMap).addSink(new ReceiveCheckNoOpSink());
iteration.closeWith(increment).addSink(new ReceiveCheckNoOpSink());
env.execute();
for (boolean iter : iterated) {
assertTrue(iter);
}
break; // success
} catch (Throwable t) {
LOG.info("Run " + (numRetry + 1) + "/" + numRetries + " failed", t);
if (numRetry >= numRetries - 1) {
throw t;
} else {
timeoutScale *= 2;
}
}
}
}
代码示例来源:origin: apache/flink
.addSource(kafkaSource)
.map(new PartitionValidatingMapper(parallelism, 1))
.map(new BrokerKillingMapper<Integer>(leaderId, failAfterElements))
.addSink(new ValidatingExactlyOnceSink(totalElements)).setParallelism(1);
代码示例来源:origin: apache/flink
/**
* Tests that a collision on the manual hash throws an Exception.
*/
@Test(expected = IllegalArgumentException.class)
public void testManualHashAssignmentCollisionThrowsException() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
env.setParallelism(4);
env.disableOperatorChaining();
env.addSource(new NoOpSourceFunction()).uid("source")
.map(new NoOpMapFunction()).uid("source") // Collision
.addSink(new NoOpSinkFunction());
// This call is necessary to generate the job graph
env.getStreamGraph().getJobGraph();
}
代码示例来源:origin: apache/flink
.addSource(kafkaSource)
.map(new PartitionValidatingMapper(parallelism, 1))
.map(new FailingIdentityMapper<Integer>(failAfterElements))
.addSink(new ValidatingExactlyOnceSink(totalElements)).setParallelism(1);
代码示例来源:origin: apache/flink
.addSource(kafkaSource)
.map(new PartitionValidatingMapper(numPartitions, 3))
.map(new FailingIdentityMapper<Integer>(failAfterElements))
.addSink(new ValidatingExactlyOnceSink(totalElements)).setParallelism(1);
代码示例来源:origin: apache/flink
.addSource(kafkaSource)
.map(new PartitionValidatingMapper(numPartitions, 1))
.map(new FailingIdentityMapper<Integer>(failAfterElements))
.addSink(new ValidatingExactlyOnceSink(totalElements)).setParallelism(1);
代码示例来源:origin: apache/flink
@Test
public void testUserProvidedHashingOnChainSupported() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
env.addSource(new NoOpSourceFunction(), "src").setUidHash("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa")
.map(new NoOpMapFunction()).setUidHash("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb")
.filter(new NoOpFilterFunction()).setUidHash("cccccccccccccccccccccccccccccccc")
.keyBy(new NoOpKeySelector())
.reduce(new NoOpReduceFunction()).name("reduce").setUidHash("dddddddddddddddddddddddddddddddd");
env.getStreamGraph().getJobGraph();
}
代码示例来源:origin: apache/flink
/**
* Runs the following program.
* <pre>
* [ (source)->(filter) ]-s->[ (map) ] -> [ (map) ] -> [ (groupBy/count)->(sink) ]
* </pre>
*/
@Override
public void testProgram(StreamExecutionEnvironment env) {
DataStream<String> stream = env.addSource(new StringGeneratingSourceFunction(NUM_STRINGS));
stream
// -------------- first vertex, chained to the source ----------------
.filter(new StringRichFilterFunction())
.shuffle()
// -------------- seconds vertex - the stateful one that also fails ----------------
.map(new StringPrefixCountRichMapFunction())
.startNewChain()
.map(new StatefulCounterFunction())
// -------------- third vertex - counter and the sink ----------------
.keyBy("prefix")
.map(new OnceFailingPrefixCounter(NUM_STRINGS))
.addSink(new SinkFunction<PrefixCount>() {
@Override
public void invoke(PrefixCount value) throws Exception {
// Do nothing here
}
});
}
代码示例来源:origin: apache/flink
.map(new StringPrefixCountRichMapFunction())
.startNewChain()
.map(new StatefulCounterFunction())
代码示例来源:origin: apache/flink
.map(new OnceFailingIdentityMapFunction(NUM_INPUT))
.keyBy(0)
.addSink(new MinEvictingQueueSink());
代码示例来源:origin: apache/flink
}).map(new MapFunction<Tuple2<Event, Long>, Event>() {
内容来源于网络,如有侵权,请联系作者删除!