本文整理了Java中org.apache.flink.streaming.api.datastream.KeyedStream.flatMap()
方法的一些代码示例,展示了KeyedStream.flatMap()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。KeyedStream.flatMap()
方法的具体详情如下:
包路径:org.apache.flink.streaming.api.datastream.KeyedStream
类名称:KeyedStream
方法名:flatMap
暂无
代码示例来源:origin: apache/flink
.flatMap(new StateMachineMapper());
代码示例来源:origin: apache/flink
public static void main(String[] args) throws Exception {
final ParameterTool pt = ParameterTool.fromArgs(args);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
setupEnvironment(env, pt);
KeyedStream<Event, Integer> source = env.addSource(createEventSource(pt))
.name("EventSource")
.uid("EventSource")
.assignTimestampsAndWatermarks(createTimestampExtractor(pt))
.keyBy(Event::getKey);
List<TypeSerializer<ComplexPayload>> stateSer =
Collections.singletonList(new KryoSerializer<>(ComplexPayload.class, env.getConfig()));
KeyedStream<Event, Integer> afterStatefulOperations = isOriginalJobVariant(pt) ?
applyOriginalStatefulOperations(source, stateSer, Collections.emptyList()) :
applyUpgradedStatefulOperations(source, stateSer, Collections.emptyList());
afterStatefulOperations
.flatMap(createSemanticsCheckMapper(pt))
.name("SemanticsCheckMapper")
.addSink(new PrintSinkFunction<>());
env.execute("General purpose test job");
}
代码示例来源:origin: apache/flink
.flatMap(new TestFlatMap());
代码示例来源:origin: apache/flink
.flatMap(new StateCreatingFlatMap(valueSize, killJvmOnFail))
.addSink(new PrintSinkFunction<>());
代码示例来源:origin: apache/flink
.flatMap(createSemanticsCheckMapper(pt))
.name(SEMANTICS_CHECK_MAPPER_NAME)
.uid("007")
.flatMap(createSlidingWindowCheckMapper(pt))
.uid("010")
.name(SLIDING_WINDOW_CHECK_MAPPER_NAME)
代码示例来源:origin: apache/flink
public static void main(String[] args) throws Exception {
final ParameterTool pt = ParameterTool.fromArgs(args);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
setupEnvironment(env, pt);
final MonotonicTTLTimeProvider ttlTimeProvider = setBackendWithCustomTTLTimeProvider(env);
TtlTestConfig config = TtlTestConfig.fromArgs(pt);
StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(config.ttl)
.cleanupIncrementally(5, true)
.cleanupFullSnapshot()
.build();
env
.addSource(new TtlStateUpdateSource(config.keySpace, config.sleepAfterElements, config.sleepTime))
.name("TtlStateUpdateSource")
.keyBy(TtlStateUpdate::getKey)
.flatMap(new TtlVerifyUpdateFunction(ttlConfig, ttlTimeProvider, config.reportStatAfterUpdatesNum))
.name("TtlVerifyUpdateFunction")
.addSink(new PrintSinkFunction<>())
.name("PrintFailedVerifications");
env.execute("State TTL test job");
}
代码示例来源:origin: apache/flink
/**
* Runs the following program.
* <pre>
* [ (source)->(filter)] -> [ (map) -> (map) ] -> [ (groupBy/reduce)->(sink) ]
* </pre>
*/
@Override
public void testProgram(StreamExecutionEnvironment env) {
assertTrue("Broken test setup", NUM_STRINGS % 40 == 0);
final long failurePosMin = (long) (0.4 * NUM_STRINGS / PARALLELISM);
final long failurePosMax = (long) (0.7 * NUM_STRINGS / PARALLELISM);
final long failurePos = (new Random().nextLong() % (failurePosMax - failurePosMin)) + failurePosMin;
env.enableCheckpointing(200);
DataStream<String> stream = env.addSource(new StringGeneratingSourceFunction(NUM_STRINGS));
stream
// first vertex, chained to the source
// this filter throttles the flow until at least one checkpoint
// is complete, to make sure this program does not run without
.filter(new StringRichFilterFunction())
// -------------- seconds vertex - one-to-one connected ----------------
.map(new StringPrefixCountRichMapFunction())
.startNewChain()
.map(new StatefulCounterFunction())
// -------------- third vertex - reducer and the sink ----------------
.keyBy("prefix")
.flatMap(new OnceFailingAggregator(failurePos))
.addSink(new ValidatingSink());
}
代码示例来源:origin: apache/flink
@Test
public void testNestedPojoFieldAccessor() throws Exception {
StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
see.getConfig().disableObjectReuse();
see.setParallelism(4);
DataStream<Data> dataStream = see.fromCollection(elements);
DataStream<Data> summedStream = dataStream
.keyBy("aaa")
.sum("stats.count")
.keyBy("aaa")
.flatMap(new FlatMapFunction<Data, Data>() {
Data[] first = new Data[3];
@Override
public void flatMap(Data value, Collector<Data> out) throws Exception {
if (first[value.aaa] == null) {
first[value.aaa] = value;
if (value.stats.count != 123) {
throw new RuntimeException("Expected stats.count to be 123");
}
} else {
if (value.stats.count != 2 * 123) {
throw new RuntimeException("Expected stats.count to be 2 * 123");
}
}
}
});
summedStream.print();
see.execute();
}
代码示例来源:origin: apache/flink
.sum("sum")
.keyBy("aaa", "abc", "wxyz")
.flatMap(new FlatMapFunction<Data, Data>() {
private static final long serialVersionUID = 788865239171396315L;
Data[] first = new Data[3];
代码示例来源:origin: apache/flink
.sum("sum")
.keyBy("aaa", "stats.count")
.flatMap(new FlatMapFunction<Data, Data>() {
private static final long serialVersionUID = -3678267280397950258L;
Data[] first = new Data[3];
代码示例来源:origin: apache/flink
.flatMap(new DuplicateFilter())
.setParallelism(ITER_TEST_PARALLELISM)
.iterate();
代码示例来源:origin: apache/flink
.flatMap(new StatefulFlatMap()).setParallelism(1)
.addSink(new CountingSink())
.setParallelism(1);
代码示例来源:origin: apache/flink
.flatMap(new LegacyCheckpointedFlatMap()).startNewChain().uid("LegacyCheckpointedFlatMap")
.keyBy(0)
.flatMap(new LegacyCheckpointedFlatMapWithKeyedState()).startNewChain().uid("LegacyCheckpointedFlatMapWithKeyedState")
.keyBy(0)
.flatMap(new KeyedStateSettingFlatMap()).startNewChain().uid("KeyedStateSettingFlatMap")
.keyBy(0)
.transform(
代码示例来源:origin: apache/flink
.addSource(nonParallelSource).uid("CheckpointingSource1")
.keyBy(0)
.flatMap(flatMap).startNewChain().uid("CheckpointingKeyedStateFlatMap1")
.keyBy(0)
.transform(
.addSource(parallelSource).uid("CheckpointingSource2")
.keyBy(0)
.flatMap(flatMap).startNewChain().uid("CheckpointingKeyedStateFlatMap2")
.keyBy(0)
.transform(
代码示例来源:origin: apache/flink
.flatMap(new CheckingRestoringFlatMap()).startNewChain().uid("LegacyCheckpointedFlatMap")
.keyBy(0)
.flatMap(new CheckingRestoringFlatMapWithKeyedState()).startNewChain().uid("LegacyCheckpointedFlatMapWithKeyedState")
.keyBy(0)
.flatMap(new CheckingKeyedStateFlatMap()).startNewChain().uid("KeyedStateSettingFlatMap")
.keyBy(0)
.transform(
代码示例来源:origin: vasia/gelly-streaming
/**
* Removes the duplicate edges by storing a neighborhood set for each vertex
*
* @return a graph stream with no duplicate edges
*/
@Override
public SimpleEdgeStream<K, EV> distinct() {
DataStream<Edge<K, EV>> edgeStream = this.edges
.keyBy(0)
.flatMap(new DistinctEdgeMapper<K, EV>());
return new SimpleEdgeStream<>(edgeStream, this.getContext());
}
代码示例来源:origin: vasia/gelly-streaming
/**
* Builds the neighborhood state by creating adjacency lists.
* Neighborhoods are currently built using a TreeSet.
*
* @param directed if true, only the out-neighbors will be stored
* otherwise both directions are considered
* @return a stream of Tuple3, where the first 2 fields identify the edge processed
* and the third field is the adjacency list that was updated by processing this edge.
*/
public DataStream<Tuple3<K, K, TreeSet<K>>> buildNeighborhood(boolean directed) {
DataStream<Edge<K, EV>> edges = this.getEdges();
if (!directed) {
edges = this.undirected().getEdges();
}
return edges.keyBy(0).flatMap(new BuildNeighborhoods<K, EV>());
}
代码示例来源:origin: vasia/gelly-streaming
public static void main(String[] args) throws Exception {
if (!parseParameters(args)) {
return;
}
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
SimpleEdgeStream<Integer, NullValue> edges = getGraphStream(env);
DataStream<Tuple2<Integer, Integer>> result =
edges.buildNeighborhood(false)
.map(new ProjectCanonicalEdges())
.keyBy(0, 1).flatMap(new IntersectNeighborhoods())
.keyBy(0).flatMap(new SumAndEmitCounters());
if (resultPath != null) {
result.writeAsText(resultPath);
}
else {
result.print();
}
env.execute("Exact Triangle Count");
}
代码示例来源:origin: vasia/gelly-streaming
public static void main(String[] args) throws Exception {
// Set up the environment
if(!parseParameters(args)) {
return;
}
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<Long, Long>> edges = getEdgesDataSet(env);
IterativeStream<Tuple2<Long, Long>> iteration = edges.iterate();
DataStream<Tuple2<Long, Long>> result = iteration.closeWith(
iteration.keyBy(0).flatMap(new AssignComponents()));
// Emit the results
result.print();
env.execute("Streaming Connected Components");
}
代码示例来源:origin: vasia/gelly-streaming
public static void main(String[] args) throws Exception {
if (!parseParameters(args)) {
return;
}
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple3<Integer, Integer, EventType>> edges = getGraphStream(env);
// 1. emit (vertexID, 1) or (vertexID, -1) for addition or deletion
edges.flatMap(new EmitVerticesWithChange())
// group by vertex ID and maintain degree per vertex
.keyBy(0).flatMap(new VertexDegreeCounts())
// group by degree and emit current count
.keyBy(0).map(new DegreeDistributionMap())
.writeAsText(resultPath);
env.execute("Streaming Degree Distribution");
}
内容来源于网络,如有侵权,请联系作者删除!