本文整理了Java中org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator.flatMap()
方法的一些代码示例,展示了SingleOutputStreamOperator.flatMap()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。SingleOutputStreamOperator.flatMap()
方法的具体详情如下:
包路径:org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
类名称:SingleOutputStreamOperator
方法名:flatMap
暂无
代码示例来源:origin: apache/flink
.flatMap(new LegacyCheckpointedFlatMap()).startNewChain().uid("LegacyCheckpointedFlatMap")
.keyBy(0)
.flatMap(new LegacyCheckpointedFlatMapWithKeyedState()).startNewChain().uid("LegacyCheckpointedFlatMapWithKeyedState")
代码示例来源:origin: apache/flink
.flatMap(new CheckingRestoringFlatMap()).startNewChain().uid("LegacyCheckpointedFlatMap")
.keyBy(0)
.flatMap(new CheckingRestoringFlatMapWithKeyedState()).startNewChain().uid("LegacyCheckpointedFlatMapWithKeyedState")
代码示例来源:origin: org.apache.flink/flink-cep_2.10
/**
* Applies a flat select function to the detected pattern sequence. For each pattern sequence
* the provided {@link PatternFlatSelectFunction} is called. The pattern flat select function
* can produce an arbitrary number of resulting elements.
*
* @param patternFlatSelectFunction The pattern flat select function which is called for each
* detected pattern sequence.
* @param <R> Type of the resulting elements
* @param outTypeInfo Explicit specification of output type.
* @return {@link DataStream} which contains the resulting elements from the pattern flat select
* function.
*/
public <R> SingleOutputStreamOperator<R> flatSelect(final PatternFlatSelectFunction<T, R> patternFlatSelectFunction, TypeInformation<R> outTypeInfo) {
SingleOutputStreamOperator<Map<String, List<T>>> patternStream =
CEPOperatorUtils.createPatternStream(inputStream, pattern);
return patternStream.flatMap(
new PatternFlatSelectMapper<>(
patternStream.getExecutionEnvironment().clean(patternFlatSelectFunction)
)).returns(outTypeInfo);
}
代码示例来源:origin: vasia/gelly-streaming
/**
* Returns a global aggregate on the previously split vertex stream
*
* @param edgeMapper the mapper that converts the edge stream to a vertex stream
* @param vertexMapper the mapper that aggregates vertex values
* @param collectUpdates boolean specifying whether the aggregate should only be collected when there is an update
* @param <VV> the return value type
* @return a stream of the aggregated values
*/
public <VV> DataStream<VV> globalAggregate(FlatMapFunction<Edge<K, EV>, Vertex<K, VV>> edgeMapper,
FlatMapFunction<Vertex<K, VV>, VV> vertexMapper, boolean collectUpdates) {
DataStream<VV> result = this.edges.flatMap(edgeMapper)
.setParallelism(1)
.flatMap(vertexMapper)
.setParallelism(1);
if (collectUpdates) {
result = result.flatMap(new GlobalAggregateMapper<VV>())
.setParallelism(1);
}
return result;
}
代码示例来源:origin: com.alibaba.blink/flink-examples-streaming
System.out.println("Set resource spec: " + resourceSpec.toString());
counts = ((SingleOutputStreamOperator<String>) text).setResources(resourceSpec)
.flatMap(new Tokenizer()).setParallelism(parallelism).setResources(resourceSpec)
代码示例来源:origin: dataArtisans/flink-dataflow
return value.getValue().getUnionTag() == outputTag;
}).flatMap(new FlatMapFunction<WindowedValue<RawUnionValue>, WindowedValue<?>>() {
@Override
public void flatMap(WindowedValue<RawUnionValue> value, Collector<WindowedValue<?>> collector) throws Exception {
代码示例来源:origin: com.alibaba.blink/flink-examples-streaming
System.out.println("Set resource spec: " + resourceSpec.toString());
counts = ((SingleOutputStreamOperator<String>) text).setResources(resourceSpec)
.flatMap(new Tokenizer()).setParallelism(parallelism).setResources(resourceSpec)
代码示例来源:origin: streampipes/streampipes-ce
@Override
public void prepareRuntime() throws SpRuntimeException {
if (debug) {
this.env = StreamExecutionEnvironment.createLocalEnvironment();
} else {
this.env = StreamExecutionEnvironment
.createRemoteEnvironment(config.getHost(), config.getPort(), config.getJarFile());
}
appendEnvironmentConfig(this.env);
// Add the first source to the topology
DataStream<Map<String, Object>> messageStream1;
SourceFunction<String> source1 = getStream1Source();
if (source1 != null) {
messageStream1 = env
.addSource(source1).flatMap(new JsonToMapFormat()).flatMap(new StatisticLogger(getGraph()));
} else {
throw new SpRuntimeException("At least one source must be defined for a flink sepa");
}
DataStream<Map<String, Object>> messageStream2;
SourceFunction<String> source2 = getStream2Source();
if (source2 != null) {
messageStream2 = env
.addSource(source2).flatMap(new JsonToMapFormat()).flatMap(new StatisticLogger(getGraph()));
appendExecutionConfig(messageStream1, messageStream2);
} else {
appendExecutionConfig(messageStream1);
}
}
代码示例来源:origin: vasia/gelly-streaming
public static void main(String[] args) throws Exception {
// Set up the environment
if(!parseParameters(args)) {
return;
}
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Edge<Long, NullValue>> edges = getEdgesDataSet(env);
int localSamples = samples / env.getParallelism();
// Count triangles
DataStream<Tuple2<Integer, Integer>> triangles = edges
.broadcast()
.flatMap(new TriangleSampler(localSamples, vertexCount))
.flatMap(new TriangleSummer(samples, vertexCount))
.setParallelism(1);
// Emit the results
if (fileOutput) {
triangles.writeAsCsv(outputPath);
} else {
triangles.print();
}
env.execute("Broadcast Triangle Count");
}
代码示例来源:origin: org.apache.flink/flink-cep_2.10
return patternStream.flatMap(
new PatternFlatSelectTimeoutWrapper<>(
patternStream.getExecutionEnvironment().clean(patternFlatSelectFunction),
代码示例来源:origin: vasia/gelly-streaming
@SuppressWarnings("unchecked")
@Override
public DataStream<T> run(final DataStream<Edge<K, EV>> edgeStream) {
//For parallel window support we key the edge stream by partition and apply a parallel fold per partition.
//Finally, we merge all locally combined results into our final graph aggregation property.
TupleTypeInfo edgeTypeInfo = (TupleTypeInfo) edgeStream.getType();
TypeInformation<S> returnType = TypeExtractor.createTypeInfo(EdgesFold.class, getUpdateFun().getClass(), 2, edgeTypeInfo.getTypeAt(0), edgeTypeInfo.getTypeAt(2));
TypeInformation<Tuple2<Integer, Edge<K, EV>>> typeInfo = new TupleTypeInfo<>(BasicTypeInfo.INT_TYPE_INFO, edgeStream.getType());
DataStream<S> partialAgg = edgeStream
.map(new PartitionMapper<>()).returns(typeInfo)
.keyBy(0)
.timeWindow(Time.of(timeMillis, TimeUnit.MILLISECONDS))
.fold(getInitialValue(), new PartialAgg<>(getUpdateFun(),returnType))
.timeWindowAll(Time.of(timeMillis, TimeUnit.MILLISECONDS))
.reduce(getCombineFun())
.flatMap(getAggregator(edgeStream)).setParallelism(1);
if (getTransform() != null) {
return partialAgg.map(getTransform());
}
return (DataStream<T>) partialAgg;
}
代码示例来源:origin: vasia/gelly-streaming
public static void main(String[] args) throws Exception {
// Set up the environment
if(!parseParameters(args)) {
return;
}
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Edge<Long, NullValue>> edges = getEdgesDataSet(env);
int localSamples = samples / env.getParallelism();
// Count triangles
DataStream<Tuple2<Integer, Integer>> triangles = edges
.flatMap(new EdgeSampleMapper(localSamples, env.getParallelism()))
.setParallelism(1)
.keyBy(0)
.flatMap(new TriangleSampleMapper(localSamples, vertexCount))
.flatMap(new TriangleSummer(samples, vertexCount))
.setParallelism(1);
// Emit the results
if (fileOutput) {
triangles.writeAsCsv(outputPath);
} else {
triangles.print();
}
env.execute("Incidence Sampling Triangle Count");
}
代码示例来源:origin: vasia/gelly-streaming
@SuppressWarnings("unchecked")
@Override
public DataStream<T> run(final DataStream<Edge<K, EV>> edgeStream) {
TypeInformation<Tuple2<Integer, Edge<K, EV>>> basicTypeInfo = new TupleTypeInfo<>(BasicTypeInfo.INT_TYPE_INFO, edgeStream.getType());
TupleTypeInfo edgeTypeInfo = (TupleTypeInfo) edgeStream.getType();
TypeInformation<S> partialAggType = TypeExtractor.createTypeInfo(EdgesFold.class, getUpdateFun().getClass(), 2, edgeTypeInfo.getTypeAt(0), edgeTypeInfo.getTypeAt(2));
TypeInformation<Tuple2<Integer, S>> partialTypeInfo = new TupleTypeInfo<>(BasicTypeInfo.INT_TYPE_INFO, partialAggType);
degree = (degree == -1) ? edgeStream.getParallelism() : degree;
DataStream<S> partialAgg = edgeStream
.map(new PartitionMapper<>()).returns(basicTypeInfo)
.setParallelism(degree)
.keyBy(0)
.timeWindow(Time.of(timeMillis, TimeUnit.MILLISECONDS))
.fold(getInitialValue(), new PartialAgg<>(getUpdateFun(), partialAggType)).setParallelism(degree);
//split here
DataStream<Tuple2<Integer, S>> treeAgg = enhance(partialAgg.map(new PartitionMapper<>()).setParallelism(degree).returns(partialTypeInfo), partialTypeInfo);
DataStream<S> resultStream = treeAgg.map(new PartitionStripper<>()).setParallelism(treeAgg.getParallelism())
.timeWindowAll(Time.of(timeMillis, TimeUnit.MILLISECONDS))
.reduce(getCombineFun())
.flatMap(getAggregator(edgeStream)).setParallelism(1);
return (getTransform() != null) ? resultStream.map(getTransform()) : (DataStream<T>) resultStream;
}
内容来源于网络,如有侵权,请联系作者删除!