org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator.flatMap()方法的使用及代码示例

x33g5p2x  于2022-01-30 转载在 其他  
字(9.1k)|赞(0)|评价(0)|浏览(156)

本文整理了Java中org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator.flatMap()方法的一些代码示例,展示了SingleOutputStreamOperator.flatMap()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。SingleOutputStreamOperator.flatMap()方法的具体详情如下:
包路径:org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
类名称:SingleOutputStreamOperator
方法名:flatMap

SingleOutputStreamOperator.flatMap介绍

暂无

代码示例

代码示例来源:origin: apache/flink

.flatMap(new LegacyCheckpointedFlatMap()).startNewChain().uid("LegacyCheckpointedFlatMap")
.keyBy(0)
.flatMap(new LegacyCheckpointedFlatMapWithKeyedState()).startNewChain().uid("LegacyCheckpointedFlatMapWithKeyedState")

代码示例来源:origin: apache/flink

.flatMap(new CheckingRestoringFlatMap()).startNewChain().uid("LegacyCheckpointedFlatMap")
.keyBy(0)
.flatMap(new CheckingRestoringFlatMapWithKeyedState()).startNewChain().uid("LegacyCheckpointedFlatMapWithKeyedState")

代码示例来源:origin: org.apache.flink/flink-cep_2.10

/**
 * Applies a flat select function to the detected pattern sequence. For each pattern sequence
 * the provided {@link PatternFlatSelectFunction} is called. The pattern flat select function
 * can produce an arbitrary number of resulting elements.
 *
 * @param patternFlatSelectFunction The pattern flat select function which is called for each
 *                                  detected pattern sequence.
 * @param <R> Type of the resulting elements
 * @param outTypeInfo Explicit specification of output type.
 * @return {@link DataStream} which contains the resulting elements from the pattern flat select
 *         function.
 */
public <R> SingleOutputStreamOperator<R> flatSelect(final PatternFlatSelectFunction<T, R> patternFlatSelectFunction, TypeInformation<R> outTypeInfo) {
  SingleOutputStreamOperator<Map<String, List<T>>> patternStream =
      CEPOperatorUtils.createPatternStream(inputStream, pattern);
  return patternStream.flatMap(
    new PatternFlatSelectMapper<>(
      patternStream.getExecutionEnvironment().clean(patternFlatSelectFunction)
    )).returns(outTypeInfo);
}

代码示例来源:origin: vasia/gelly-streaming

/**
 * Returns a global aggregate on the previously split vertex stream
 *
 * @param edgeMapper the mapper that converts the edge stream to a vertex stream
 * @param vertexMapper the mapper that aggregates vertex values
 * @param collectUpdates boolean specifying whether the aggregate should only be collected when there is an update
 * @param <VV> the return value type
 * @return a stream of the aggregated values
 */
public <VV> DataStream<VV> globalAggregate(FlatMapFunction<Edge<K, EV>, Vertex<K, VV>> edgeMapper,
    FlatMapFunction<Vertex<K, VV>, VV> vertexMapper, boolean collectUpdates) {
  DataStream<VV> result = this.edges.flatMap(edgeMapper)
      .setParallelism(1)
      .flatMap(vertexMapper)
      .setParallelism(1);
  if (collectUpdates) {
    result = result.flatMap(new GlobalAggregateMapper<VV>())
        .setParallelism(1);
  }
  return result;
}

代码示例来源:origin: com.alibaba.blink/flink-examples-streaming

System.out.println("Set resource spec: " + resourceSpec.toString());
counts = ((SingleOutputStreamOperator<String>) text).setResources(resourceSpec)
  .flatMap(new Tokenizer()).setParallelism(parallelism).setResources(resourceSpec)

代码示例来源:origin: dataArtisans/flink-dataflow

return value.getValue().getUnionTag() == outputTag;
}).flatMap(new FlatMapFunction<WindowedValue<RawUnionValue>, WindowedValue<?>>() {
  @Override
  public void flatMap(WindowedValue<RawUnionValue> value, Collector<WindowedValue<?>> collector) throws Exception {

代码示例来源:origin: com.alibaba.blink/flink-examples-streaming

System.out.println("Set resource spec: " + resourceSpec.toString());
counts = ((SingleOutputStreamOperator<String>) text).setResources(resourceSpec)
  .flatMap(new Tokenizer()).setParallelism(parallelism).setResources(resourceSpec)

代码示例来源:origin: streampipes/streampipes-ce

@Override
public void prepareRuntime() throws SpRuntimeException {
 if (debug) {
  this.env = StreamExecutionEnvironment.createLocalEnvironment();
 } else {
  this.env = StreamExecutionEnvironment
      .createRemoteEnvironment(config.getHost(), config.getPort(), config.getJarFile());
 }
 appendEnvironmentConfig(this.env);
 // Add the first source to the topology
 DataStream<Map<String, Object>> messageStream1;
 SourceFunction<String> source1 = getStream1Source();
 if (source1 != null) {
  messageStream1 = env
      .addSource(source1).flatMap(new JsonToMapFormat()).flatMap(new StatisticLogger(getGraph()));
 } else {
  throw new SpRuntimeException("At least one source must be defined for a flink sepa");
 }
 DataStream<Map<String, Object>> messageStream2;
 SourceFunction<String> source2 = getStream2Source();
 if (source2 != null) {
  messageStream2 = env
      .addSource(source2).flatMap(new JsonToMapFormat()).flatMap(new StatisticLogger(getGraph()));
  appendExecutionConfig(messageStream1, messageStream2);
 } else {
  appendExecutionConfig(messageStream1);
 }
}

代码示例来源:origin: vasia/gelly-streaming

public static void main(String[] args) throws Exception {
  // Set up the environment
  if(!parseParameters(args)) {
    return;
  }
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  DataStream<Edge<Long, NullValue>> edges = getEdgesDataSet(env);
  int localSamples = samples / env.getParallelism();
  // Count triangles
  DataStream<Tuple2<Integer, Integer>> triangles = edges
      .broadcast()
      .flatMap(new TriangleSampler(localSamples, vertexCount))
      .flatMap(new TriangleSummer(samples, vertexCount))
      .setParallelism(1);
  // Emit the results
  if (fileOutput) {
    triangles.writeAsCsv(outputPath);
  } else {
    triangles.print();
  }
  env.execute("Broadcast Triangle Count");
}

代码示例来源:origin: org.apache.flink/flink-cep_2.10

return patternStream.flatMap(
  new PatternFlatSelectTimeoutWrapper<>(
    patternStream.getExecutionEnvironment().clean(patternFlatSelectFunction),

代码示例来源:origin: vasia/gelly-streaming

@SuppressWarnings("unchecked")
@Override
public DataStream<T> run(final DataStream<Edge<K, EV>> edgeStream) {
  //For parallel window support we key the edge stream by partition and apply a parallel fold per partition.
  //Finally, we merge all locally combined results into our final graph aggregation property.
  TupleTypeInfo edgeTypeInfo = (TupleTypeInfo) edgeStream.getType();
  TypeInformation<S> returnType = TypeExtractor.createTypeInfo(EdgesFold.class, getUpdateFun().getClass(), 2, edgeTypeInfo.getTypeAt(0), edgeTypeInfo.getTypeAt(2));
  TypeInformation<Tuple2<Integer, Edge<K, EV>>> typeInfo = new TupleTypeInfo<>(BasicTypeInfo.INT_TYPE_INFO, edgeStream.getType());
  DataStream<S> partialAgg = edgeStream
      .map(new PartitionMapper<>()).returns(typeInfo)
      .keyBy(0)
      .timeWindow(Time.of(timeMillis, TimeUnit.MILLISECONDS))
      .fold(getInitialValue(), new PartialAgg<>(getUpdateFun(),returnType))
      .timeWindowAll(Time.of(timeMillis, TimeUnit.MILLISECONDS))
      .reduce(getCombineFun())
      .flatMap(getAggregator(edgeStream)).setParallelism(1);
  if (getTransform() != null) {
    return partialAgg.map(getTransform());
  }
  return (DataStream<T>) partialAgg;
}

代码示例来源:origin: vasia/gelly-streaming

public static void main(String[] args) throws Exception {
  // Set up the environment
  if(!parseParameters(args)) {
    return;
  }
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  DataStream<Edge<Long, NullValue>> edges = getEdgesDataSet(env);
  int localSamples = samples / env.getParallelism();
  // Count triangles
  DataStream<Tuple2<Integer, Integer>> triangles = edges
      .flatMap(new EdgeSampleMapper(localSamples, env.getParallelism()))
      .setParallelism(1)
      .keyBy(0)
      .flatMap(new TriangleSampleMapper(localSamples, vertexCount))
      .flatMap(new TriangleSummer(samples, vertexCount))
      .setParallelism(1);
  // Emit the results
  if (fileOutput) {
    triangles.writeAsCsv(outputPath);
  } else {
    triangles.print();
  }
  env.execute("Incidence Sampling Triangle Count");
}

代码示例来源:origin: vasia/gelly-streaming

@SuppressWarnings("unchecked")
@Override
public DataStream<T> run(final DataStream<Edge<K, EV>> edgeStream) {
  TypeInformation<Tuple2<Integer, Edge<K, EV>>> basicTypeInfo = new TupleTypeInfo<>(BasicTypeInfo.INT_TYPE_INFO, edgeStream.getType());
  TupleTypeInfo edgeTypeInfo = (TupleTypeInfo) edgeStream.getType();
  TypeInformation<S> partialAggType = TypeExtractor.createTypeInfo(EdgesFold.class, getUpdateFun().getClass(), 2, edgeTypeInfo.getTypeAt(0), edgeTypeInfo.getTypeAt(2));
  TypeInformation<Tuple2<Integer, S>> partialTypeInfo = new TupleTypeInfo<>(BasicTypeInfo.INT_TYPE_INFO, partialAggType);
  degree = (degree == -1) ? edgeStream.getParallelism() : degree;
  
  DataStream<S> partialAgg = edgeStream
      .map(new PartitionMapper<>()).returns(basicTypeInfo)
      .setParallelism(degree)
      .keyBy(0)
      .timeWindow(Time.of(timeMillis, TimeUnit.MILLISECONDS))
      .fold(getInitialValue(), new PartialAgg<>(getUpdateFun(), partialAggType)).setParallelism(degree);
  //split here
  DataStream<Tuple2<Integer, S>> treeAgg = enhance(partialAgg.map(new PartitionMapper<>()).setParallelism(degree).returns(partialTypeInfo), partialTypeInfo);
  DataStream<S> resultStream = treeAgg.map(new PartitionStripper<>()).setParallelism(treeAgg.getParallelism())
      .timeWindowAll(Time.of(timeMillis, TimeUnit.MILLISECONDS))
      .reduce(getCombineFun())
      .flatMap(getAggregator(edgeStream)).setParallelism(1);
  return (getTransform() != null) ? resultStream.map(getTransform()) : (DataStream<T>) resultStream;
}

相关文章

微信公众号

最新文章

更多