org.apache.flink.streaming.api.datastream.KeyedStream.flatMap()方法的使用及代码示例

x33g5p2x  于2022-01-23 转载在 其他  
字(9.3k)|赞(0)|评价(0)|浏览(202)

本文整理了Java中org.apache.flink.streaming.api.datastream.KeyedStream.flatMap()方法的一些代码示例,展示了KeyedStream.flatMap()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。KeyedStream.flatMap()方法的具体详情如下:
包路径:org.apache.flink.streaming.api.datastream.KeyedStream
类名称:KeyedStream
方法名:flatMap

KeyedStream.flatMap介绍

暂无

代码示例

代码示例来源:origin: apache/flink

.flatMap(new StateMachineMapper());

代码示例来源:origin: apache/flink

public static void main(String[] args) throws Exception {
  final ParameterTool pt = ParameterTool.fromArgs(args);
  final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  setupEnvironment(env, pt);
  KeyedStream<Event, Integer> source = env.addSource(createEventSource(pt))
    .name("EventSource")
    .uid("EventSource")
    .assignTimestampsAndWatermarks(createTimestampExtractor(pt))
    .keyBy(Event::getKey);
  List<TypeSerializer<ComplexPayload>> stateSer =
    Collections.singletonList(new KryoSerializer<>(ComplexPayload.class, env.getConfig()));
  KeyedStream<Event, Integer> afterStatefulOperations = isOriginalJobVariant(pt) ?
    applyOriginalStatefulOperations(source, stateSer, Collections.emptyList()) :
    applyUpgradedStatefulOperations(source, stateSer, Collections.emptyList());
  afterStatefulOperations
    .flatMap(createSemanticsCheckMapper(pt))
    .name("SemanticsCheckMapper")
    .addSink(new PrintSinkFunction<>());
  env.execute("General purpose test job");
}

代码示例来源:origin: apache/flink

.flatMap(new TestFlatMap());

代码示例来源:origin: apache/flink

.flatMap(new StateCreatingFlatMap(valueSize, killJvmOnFail))
.addSink(new PrintSinkFunction<>());

代码示例来源:origin: apache/flink

.flatMap(createSemanticsCheckMapper(pt))
.name(SEMANTICS_CHECK_MAPPER_NAME)
.uid("007")
.flatMap(createSlidingWindowCheckMapper(pt))
.uid("010")
.name(SLIDING_WINDOW_CHECK_MAPPER_NAME)

代码示例来源:origin: apache/flink

public static void main(String[] args) throws Exception {
  final ParameterTool pt = ParameterTool.fromArgs(args);
  final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  setupEnvironment(env, pt);
  final MonotonicTTLTimeProvider ttlTimeProvider = setBackendWithCustomTTLTimeProvider(env);
  TtlTestConfig config = TtlTestConfig.fromArgs(pt);
  StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(config.ttl)
    .cleanupIncrementally(5, true)
    .cleanupFullSnapshot()
    .build();
  env
    .addSource(new TtlStateUpdateSource(config.keySpace, config.sleepAfterElements, config.sleepTime))
    .name("TtlStateUpdateSource")
    .keyBy(TtlStateUpdate::getKey)
    .flatMap(new TtlVerifyUpdateFunction(ttlConfig, ttlTimeProvider, config.reportStatAfterUpdatesNum))
    .name("TtlVerifyUpdateFunction")
    .addSink(new PrintSinkFunction<>())
    .name("PrintFailedVerifications");
  env.execute("State TTL test job");
}

代码示例来源:origin: apache/flink

/**
 * Runs the following program.
 * <pre>
 *     [ (source)->(filter)] -> [ (map) -> (map) ] -> [ (groupBy/reduce)->(sink) ]
 * </pre>
 */
@Override
public void testProgram(StreamExecutionEnvironment env) {
  assertTrue("Broken test setup", NUM_STRINGS % 40 == 0);
  final long failurePosMin = (long) (0.4 * NUM_STRINGS / PARALLELISM);
  final long failurePosMax = (long) (0.7 * NUM_STRINGS / PARALLELISM);
  final long failurePos = (new Random().nextLong() % (failurePosMax - failurePosMin)) + failurePosMin;
  env.enableCheckpointing(200);
  DataStream<String> stream = env.addSource(new StringGeneratingSourceFunction(NUM_STRINGS));
  stream
      // first vertex, chained to the source
      // this filter throttles the flow until at least one checkpoint
      // is complete, to make sure this program does not run without
      .filter(new StringRichFilterFunction())
          // -------------- seconds vertex - one-to-one connected ----------------
      .map(new StringPrefixCountRichMapFunction())
      .startNewChain()
      .map(new StatefulCounterFunction())
          // -------------- third vertex - reducer and the sink ----------------
      .keyBy("prefix")
      .flatMap(new OnceFailingAggregator(failurePos))
      .addSink(new ValidatingSink());
}

代码示例来源:origin: apache/flink

@Test
public void testNestedPojoFieldAccessor() throws Exception {
  StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
  see.getConfig().disableObjectReuse();
  see.setParallelism(4);
  DataStream<Data> dataStream = see.fromCollection(elements);
  DataStream<Data> summedStream = dataStream
    .keyBy("aaa")
    .sum("stats.count")
    .keyBy("aaa")
    .flatMap(new FlatMapFunction<Data, Data>() {
      Data[] first = new Data[3];
      @Override
      public void flatMap(Data value, Collector<Data> out) throws Exception {
        if (first[value.aaa] == null) {
          first[value.aaa] = value;
          if (value.stats.count != 123) {
            throw new RuntimeException("Expected stats.count to be 123");
          }
        } else {
          if (value.stats.count != 2 * 123) {
            throw new RuntimeException("Expected stats.count to be 2 * 123");
          }
        }
      }
    });
  summedStream.print();
  see.execute();
}

代码示例来源:origin: apache/flink

.sum("sum")
.keyBy("aaa", "abc", "wxyz")
.flatMap(new FlatMapFunction<Data, Data>() {
  private static final long serialVersionUID = 788865239171396315L;
  Data[] first = new Data[3];

代码示例来源:origin: apache/flink

.sum("sum")
.keyBy("aaa", "stats.count")
.flatMap(new FlatMapFunction<Data, Data>() {
  private static final long serialVersionUID = -3678267280397950258L;
  Data[] first = new Data[3];

代码示例来源:origin: apache/flink

.flatMap(new DuplicateFilter())
.setParallelism(ITER_TEST_PARALLELISM)
.iterate();

代码示例来源:origin: apache/flink

.flatMap(new StatefulFlatMap()).setParallelism(1)
.addSink(new CountingSink())
.setParallelism(1);

代码示例来源:origin: apache/flink

.flatMap(new LegacyCheckpointedFlatMap()).startNewChain().uid("LegacyCheckpointedFlatMap")
.keyBy(0)
.flatMap(new LegacyCheckpointedFlatMapWithKeyedState()).startNewChain().uid("LegacyCheckpointedFlatMapWithKeyedState")
.keyBy(0)
.flatMap(new KeyedStateSettingFlatMap()).startNewChain().uid("KeyedStateSettingFlatMap")
.keyBy(0)
.transform(

代码示例来源:origin: apache/flink

.addSource(nonParallelSource).uid("CheckpointingSource1")
.keyBy(0)
.flatMap(flatMap).startNewChain().uid("CheckpointingKeyedStateFlatMap1")
.keyBy(0)
.transform(
.addSource(parallelSource).uid("CheckpointingSource2")
.keyBy(0)
.flatMap(flatMap).startNewChain().uid("CheckpointingKeyedStateFlatMap2")
.keyBy(0)
.transform(

代码示例来源:origin: apache/flink

.flatMap(new CheckingRestoringFlatMap()).startNewChain().uid("LegacyCheckpointedFlatMap")
.keyBy(0)
.flatMap(new CheckingRestoringFlatMapWithKeyedState()).startNewChain().uid("LegacyCheckpointedFlatMapWithKeyedState")
.keyBy(0)
.flatMap(new CheckingKeyedStateFlatMap()).startNewChain().uid("KeyedStateSettingFlatMap")
.keyBy(0)
.transform(

代码示例来源:origin: vasia/gelly-streaming

/**
 * Removes the duplicate edges by storing a neighborhood set for each vertex
 *
 * @return a graph stream with no duplicate edges
 */
@Override
public SimpleEdgeStream<K, EV> distinct() {
  DataStream<Edge<K, EV>> edgeStream = this.edges
      .keyBy(0)
      .flatMap(new DistinctEdgeMapper<K, EV>());
  return new SimpleEdgeStream<>(edgeStream, this.getContext());
}

代码示例来源:origin: vasia/gelly-streaming

/**
 * Builds the neighborhood state by creating adjacency lists.
 * Neighborhoods are currently built using a TreeSet.
 *
 * @param directed if true, only the out-neighbors will be stored
 *                 otherwise both directions are considered
 * @return a stream of Tuple3, where the first 2 fields identify the edge processed
 * and the third field is the adjacency list that was updated by processing this edge.
 */
public DataStream<Tuple3<K, K, TreeSet<K>>> buildNeighborhood(boolean directed) {
  DataStream<Edge<K, EV>> edges = this.getEdges();
  if (!directed) {
    edges = this.undirected().getEdges();
  }
  return edges.keyBy(0).flatMap(new BuildNeighborhoods<K, EV>());
}

代码示例来源:origin: vasia/gelly-streaming

public static void main(String[] args) throws Exception {
  if (!parseParameters(args)) {
    return;
  }
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  SimpleEdgeStream<Integer, NullValue> edges = getGraphStream(env);
  DataStream<Tuple2<Integer, Integer>> result =
      edges.buildNeighborhood(false)
      .map(new ProjectCanonicalEdges())
      .keyBy(0, 1).flatMap(new IntersectNeighborhoods())
      .keyBy(0).flatMap(new SumAndEmitCounters());
  if (resultPath != null) {
    result.writeAsText(resultPath);
  }
  else {
    result.print();
  }
  env.execute("Exact Triangle Count");
}

代码示例来源:origin: vasia/gelly-streaming

public static void main(String[] args) throws Exception {
  // Set up the environment
  if(!parseParameters(args)) {
    return;
  }
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  DataStream<Tuple2<Long, Long>> edges = getEdgesDataSet(env);
  IterativeStream<Tuple2<Long, Long>> iteration = edges.iterate();
  DataStream<Tuple2<Long, Long>> result = iteration.closeWith(
      iteration.keyBy(0).flatMap(new AssignComponents()));
  // Emit the results
  result.print();
  env.execute("Streaming Connected Components");
}

代码示例来源:origin: vasia/gelly-streaming

public static void main(String[] args) throws Exception {
  if (!parseParameters(args)) {
    return;
  }
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  DataStream<Tuple3<Integer, Integer, EventType>> edges = getGraphStream(env);
  // 1. emit (vertexID, 1) or (vertexID, -1) for addition or deletion
  edges.flatMap(new EmitVerticesWithChange())
      // group by vertex ID and maintain degree per vertex
      .keyBy(0).flatMap(new VertexDegreeCounts())
      // group by degree and emit current count
      .keyBy(0).map(new DegreeDistributionMap())
      .writeAsText(resultPath);
  env.execute("Streaming Degree Distribution");
}

相关文章