org.apache.flink.streaming.api.datastream.KeyedStream.map()方法的使用及代码示例

x33g5p2x  于2022-01-23 转载在 其他  
字(9.8k)|赞(0)|评价(0)|浏览(150)

本文整理了Java中org.apache.flink.streaming.api.datastream.KeyedStream.map()方法的一些代码示例,展示了KeyedStream.map()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。KeyedStream.map()方法的具体详情如下:
包路径:org.apache.flink.streaming.api.datastream.KeyedStream
类名称:KeyedStream
方法名:map

KeyedStream.map介绍

暂无

代码示例

代码示例来源:origin: apache/flink

private static KeyedStream<Event, Integer> applyTestStatefulOperator(
  String name,
  JoinFunction<Event, ComplexPayload, ComplexPayload> stateFunc,
  KeyedStream<Event, Integer> source,
  List<TypeSerializer<ComplexPayload>> stateSer,
  List<Class<ComplexPayload>> stateClass) {
  return source
    .map(createArtificialKeyedStateMapper(e -> e, stateFunc, stateSer, stateClass))
    .name(name)
    .uid(name)
    .returns(Event.class)
    .keyBy(Event::getKey);
}

代码示例来源:origin: apache/flink

.map(new CheckpointBlockingFunction());

代码示例来源:origin: apache/flink

.map(new OnceFailingPartitionedSum(failurePos))
.keyBy(0)
.addSink(new CounterSink());

代码示例来源:origin: apache/flink

public static void main(String[] args) throws Exception {
  ParameterTool params = ParameterTool.fromArgs(args);
  String outputPath = params.getRequired("outputPath");
  StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();
  sEnv.setRestartStrategy(RestartStrategies.fixedDelayRestart(
      3,
      Time.of(10, TimeUnit.SECONDS)
    ));
  sEnv.enableCheckpointing(4000);
  final int idlenessMs = 10;
  // define bucketing sink to emit the result
  BucketingSink<Tuple4<Integer, Long, Integer, String>> sink = new BucketingSink<Tuple4<Integer, Long, Integer, String>>(outputPath)
    .setBucketer(new KeyBucketer());
  // generate data, shuffle, perform stateful operation, sink
  sEnv.addSource(new Generator(10, idlenessMs, 60))
    .keyBy(0)
    .map(new SubtractingMapper(-1L * idlenessMs))
    .addSink(sink);
  sEnv.execute();
}

代码示例来源:origin: apache/flink

/**
 * Tests that the max parallelism is automatically set to the parallelism if it has not been
 * specified.
 */
@Test
public void testAutoMaxParallelism() {
  int globalParallelism = 42;
  int mapParallelism = 17;
  int maxParallelism = 21;
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  env.setParallelism(globalParallelism);
  DataStream<Integer> source = env.fromElements(1, 2, 3);
  DataStream<Integer> keyedResult1 = source.keyBy(value -> value).map(new NoOpIntMap());
  DataStream<Integer> keyedResult2 = keyedResult1.keyBy(value -> value).map(new NoOpIntMap()).setParallelism(mapParallelism);
  DataStream<Integer> keyedResult3 = keyedResult2.keyBy(value -> value).map(new NoOpIntMap()).setMaxParallelism(maxParallelism);
  DataStream<Integer> keyedResult4 = keyedResult3.keyBy(value -> value).map(new NoOpIntMap()).setMaxParallelism(maxParallelism).setParallelism(mapParallelism);
  keyedResult4.addSink(new DiscardingSink<>());
  StreamGraph graph = env.getStreamGraph();
  StreamNode keyedResult3Node = graph.getStreamNode(keyedResult3.getId());
  StreamNode keyedResult4Node = graph.getStreamNode(keyedResult4.getId());
  assertEquals(maxParallelism, keyedResult3Node.getMaxParallelism());
  assertEquals(maxParallelism, keyedResult4Node.getMaxParallelism());
}

代码示例来源:origin: apache/flink

.map(new MapFunction<Tuple2<String, String>, Tuple2<String, String>>() {

代码示例来源:origin: apache/flink

public static void main(String[] args) throws Exception {
    final ParameterTool pt = ParameterTool.fromArgs(args);
    final String checkpointDir = pt.getRequired("checkpoint.dir");

    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setStateBackend(new FsStateBackend(checkpointDir));
    env.setRestartStrategy(RestartStrategies.noRestart());
    env.enableCheckpointing(1000L);
    env.getConfig().disableGenericTypes();

    env.addSource(new MySource()).uid("my-source")
        .keyBy(anInt -> 0)
        .map(new MyStatefulFunction()).uid("my-map")
        .addSink(new DiscardingSink<>()).uid("my-sink");
    env.execute();
  }
}

代码示例来源:origin: apache/flink

/**
 * Tests that the KeyGroupStreamPartitioner are properly set up with the correct value of
 * maximum parallelism.
 */
@Test
public void testSetupOfKeyGroupPartitioner() {
  int maxParallelism = 42;
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  env.getConfig().setMaxParallelism(maxParallelism);
  DataStream<Integer> source = env.fromElements(1, 2, 3);
  DataStream<Integer> keyedResult = source.keyBy(value -> value).map(new NoOpIntMap());
  keyedResult.addSink(new DiscardingSink<>());
  StreamGraph graph = env.getStreamGraph();
  StreamNode keyedResultNode = graph.getStreamNode(keyedResult.getId());
  StreamPartitioner<?> streamPartitioner = keyedResultNode.getInEdges().get(0).getPartitioner();
}

代码示例来源:origin: apache/flink

.assignTimestampsAndWatermarks(createTimestampExtractor(pt))
.keyBy(Event::getKey)
.map(createArtificialKeyedStateMapper(
.map(createArtificialKeyedStateMapper(

代码示例来源:origin: apache/flink

public static void main(String[] args) throws Exception {
  // parse input arguments
  final ParameterTool parameterTool = ParameterTool.fromArgs(args);
  StreamExecutionEnvironment env = KafkaExampleUtil.prepareExecutionEnv(parameterTool);
  DataStream<KafkaEvent> input = env
      .addSource(
        new FlinkKafkaConsumer010<>(
          parameterTool.getRequired("input-topic"),
          new KafkaEventSchema(),
          parameterTool.getProperties())
        .assignTimestampsAndWatermarks(new CustomWatermarkExtractor()))
      .keyBy("word")
      .map(new RollingAdditionMapper());
  input.addSink(
      new FlinkKafkaProducer010<>(
          parameterTool.getRequired("output-topic"),
          new KafkaEventSchema(),
          parameterTool.getProperties()));
  env.execute("Kafka 0.10 Example");
}

代码示例来源:origin: apache/flink

public static void main(String[] args) throws Exception {
  // parse input arguments
  final ParameterTool parameterTool = ParameterTool.fromArgs(args);
  StreamExecutionEnvironment env = KafkaExampleUtil.prepareExecutionEnv(parameterTool);
  DataStream<KafkaEvent> input = env
      .addSource(
        new FlinkKafkaConsumer011<>(
          parameterTool.getRequired("input-topic"),
          new KafkaEventSchema(),
          parameterTool.getProperties())
        .assignTimestampsAndWatermarks(new CustomWatermarkExtractor()))
      .keyBy("word")
      .map(new RollingAdditionMapper());
  input.addSink(
      new FlinkKafkaProducer011<>(
          parameterTool.getRequired("output-topic"),
          new KafkaEventSchema(),
          parameterTool.getProperties()));
  env.execute("Kafka 0.11 Example");
}

代码示例来源:origin: apache/flink

public static void main(String[] args) throws Exception {
  // parse input arguments
  final ParameterTool parameterTool = ParameterTool.fromArgs(args);
  StreamExecutionEnvironment env = KafkaExampleUtil.prepareExecutionEnv(parameterTool);
  DataStream<KafkaEvent> input = env
    .addSource(
      new FlinkKafkaConsumer<>(
        parameterTool.getRequired("input-topic"),
        new KafkaEventSchema(),
        parameterTool.getProperties())
        .assignTimestampsAndWatermarks(new CustomWatermarkExtractor()))
    .keyBy("word")
    .map(new RollingAdditionMapper());
  input.addSink(
    new FlinkKafkaProducer<>(
      parameterTool.getRequired("output-topic"),
      new KeyedSerializationSchemaWrapper<>(new KafkaEventSchema()),
      parameterTool.getProperties(),
      FlinkKafkaProducer.Semantic.EXACTLY_ONCE));
  env.execute("Modern Kafka Example");
}

代码示例来源:origin: apache/flink

/**
 * Tests that the global and operator-wide max parallelism setting is respected.
 */
@Test
public void testMaxParallelismForwarding() {
  int globalMaxParallelism = 42;
  int keyedResult2MaxParallelism = 17;
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  env.getConfig().setMaxParallelism(globalMaxParallelism);
  DataStream<Integer> source = env.fromElements(1, 2, 3);
  DataStream<Integer> keyedResult1 = source.keyBy(value -> value).map(new NoOpIntMap());
  DataStream<Integer> keyedResult2 = keyedResult1
      .keyBy(value -> value)
      .map(new NoOpIntMap())
        .setMaxParallelism(keyedResult2MaxParallelism);
  keyedResult2.addSink(new DiscardingSink<>());
  StreamGraph graph = env.getStreamGraph();
  StreamNode keyedResult1Node = graph.getStreamNode(keyedResult1.getId());
  StreamNode keyedResult2Node = graph.getStreamNode(keyedResult2.getId());
  assertEquals(globalMaxParallelism, keyedResult1Node.getMaxParallelism());
  assertEquals(keyedResult2MaxParallelism, keyedResult2Node.getMaxParallelism());
}

代码示例来源:origin: apache/flink

.map(new MapFunction<Tuple2<Integer, Integer>, Integer>() {

代码示例来源:origin: apache/flink

.map(new RichMapFunction<Tuple2<String, Integer>, String>() {
  private static final long serialVersionUID = 1L;

代码示例来源:origin: apache/flink

/**
 * Runs the following program.
 * <pre>
 *     [ (source)->(filter) ]-s->[ (map) ] -> [ (map) ] -> [ (groupBy/count)->(sink) ]
 * </pre>
 */
@Override
public void testProgram(StreamExecutionEnvironment env) {
  DataStream<String> stream = env.addSource(new StringGeneratingSourceFunction(NUM_STRINGS));
  stream
      // -------------- first vertex, chained to the source ----------------
      .filter(new StringRichFilterFunction())
      .shuffle()
      // -------------- seconds vertex - the stateful one that also fails ----------------
      .map(new StringPrefixCountRichMapFunction())
      .startNewChain()
      .map(new StatefulCounterFunction())
      // -------------- third vertex - counter and the sink ----------------
      .keyBy("prefix")
      .map(new OnceFailingPrefixCounter(NUM_STRINGS))
      .addSink(new SinkFunction<PrefixCount>() {
        @Override
        public void invoke(PrefixCount value) throws Exception {
          // Do nothing here
        }
      });
}

代码示例来源:origin: apache/flink

.map(new SubtaskIndexAssigner())
.addSink(hashPartitionResultSink);

代码示例来源:origin: apache/flink

.map(new TestMapFunction())
.addSink(new MigrationTestUtils.AccumulatorCountingSink<>());

代码示例来源:origin: intel-hadoop/HiBench

.map(new RichMapFunction<Tuple2<String, Tuple2<String, Integer>>, Tuple2<String, Tuple2<String, Integer>>>() {
 private transient ValueState<Integer> sum;

代码示例来源:origin: king/bravo

public DataStream<String> restoreTestPipeline(DataStream<String> source) {
  return source
      .map(Integer::parseInt)
      .returns(Integer.class)
      .keyBy(i -> i)
      .map(new StatefulCounter2())
      .uid("hello").map(Tuple3::toString)
      .returns(String.class);
}

相关文章