本文整理了Java中org.apache.flink.streaming.api.datastream.KeyedStream.map()
方法的一些代码示例,展示了KeyedStream.map()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。KeyedStream.map()
方法的具体详情如下:
包路径:org.apache.flink.streaming.api.datastream.KeyedStream
类名称:KeyedStream
方法名:map
暂无
代码示例来源:origin: apache/flink
private static KeyedStream<Event, Integer> applyTestStatefulOperator(
String name,
JoinFunction<Event, ComplexPayload, ComplexPayload> stateFunc,
KeyedStream<Event, Integer> source,
List<TypeSerializer<ComplexPayload>> stateSer,
List<Class<ComplexPayload>> stateClass) {
return source
.map(createArtificialKeyedStateMapper(e -> e, stateFunc, stateSer, stateClass))
.name(name)
.uid(name)
.returns(Event.class)
.keyBy(Event::getKey);
}
代码示例来源:origin: apache/flink
.map(new CheckpointBlockingFunction());
代码示例来源:origin: apache/flink
.map(new OnceFailingPartitionedSum(failurePos))
.keyBy(0)
.addSink(new CounterSink());
代码示例来源:origin: apache/flink
public static void main(String[] args) throws Exception {
ParameterTool params = ParameterTool.fromArgs(args);
String outputPath = params.getRequired("outputPath");
StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();
sEnv.setRestartStrategy(RestartStrategies.fixedDelayRestart(
3,
Time.of(10, TimeUnit.SECONDS)
));
sEnv.enableCheckpointing(4000);
final int idlenessMs = 10;
// define bucketing sink to emit the result
BucketingSink<Tuple4<Integer, Long, Integer, String>> sink = new BucketingSink<Tuple4<Integer, Long, Integer, String>>(outputPath)
.setBucketer(new KeyBucketer());
// generate data, shuffle, perform stateful operation, sink
sEnv.addSource(new Generator(10, idlenessMs, 60))
.keyBy(0)
.map(new SubtractingMapper(-1L * idlenessMs))
.addSink(sink);
sEnv.execute();
}
代码示例来源:origin: apache/flink
/**
* Tests that the max parallelism is automatically set to the parallelism if it has not been
* specified.
*/
@Test
public void testAutoMaxParallelism() {
int globalParallelism = 42;
int mapParallelism = 17;
int maxParallelism = 21;
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(globalParallelism);
DataStream<Integer> source = env.fromElements(1, 2, 3);
DataStream<Integer> keyedResult1 = source.keyBy(value -> value).map(new NoOpIntMap());
DataStream<Integer> keyedResult2 = keyedResult1.keyBy(value -> value).map(new NoOpIntMap()).setParallelism(mapParallelism);
DataStream<Integer> keyedResult3 = keyedResult2.keyBy(value -> value).map(new NoOpIntMap()).setMaxParallelism(maxParallelism);
DataStream<Integer> keyedResult4 = keyedResult3.keyBy(value -> value).map(new NoOpIntMap()).setMaxParallelism(maxParallelism).setParallelism(mapParallelism);
keyedResult4.addSink(new DiscardingSink<>());
StreamGraph graph = env.getStreamGraph();
StreamNode keyedResult3Node = graph.getStreamNode(keyedResult3.getId());
StreamNode keyedResult4Node = graph.getStreamNode(keyedResult4.getId());
assertEquals(maxParallelism, keyedResult3Node.getMaxParallelism());
assertEquals(maxParallelism, keyedResult4Node.getMaxParallelism());
}
代码示例来源:origin: apache/flink
.map(new MapFunction<Tuple2<String, String>, Tuple2<String, String>>() {
代码示例来源:origin: apache/flink
public static void main(String[] args) throws Exception {
final ParameterTool pt = ParameterTool.fromArgs(args);
final String checkpointDir = pt.getRequired("checkpoint.dir");
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new FsStateBackend(checkpointDir));
env.setRestartStrategy(RestartStrategies.noRestart());
env.enableCheckpointing(1000L);
env.getConfig().disableGenericTypes();
env.addSource(new MySource()).uid("my-source")
.keyBy(anInt -> 0)
.map(new MyStatefulFunction()).uid("my-map")
.addSink(new DiscardingSink<>()).uid("my-sink");
env.execute();
}
}
代码示例来源:origin: apache/flink
/**
* Tests that the KeyGroupStreamPartitioner are properly set up with the correct value of
* maximum parallelism.
*/
@Test
public void testSetupOfKeyGroupPartitioner() {
int maxParallelism = 42;
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setMaxParallelism(maxParallelism);
DataStream<Integer> source = env.fromElements(1, 2, 3);
DataStream<Integer> keyedResult = source.keyBy(value -> value).map(new NoOpIntMap());
keyedResult.addSink(new DiscardingSink<>());
StreamGraph graph = env.getStreamGraph();
StreamNode keyedResultNode = graph.getStreamNode(keyedResult.getId());
StreamPartitioner<?> streamPartitioner = keyedResultNode.getInEdges().get(0).getPartitioner();
}
代码示例来源:origin: apache/flink
.assignTimestampsAndWatermarks(createTimestampExtractor(pt))
.keyBy(Event::getKey)
.map(createArtificialKeyedStateMapper(
.map(createArtificialKeyedStateMapper(
代码示例来源:origin: apache/flink
public static void main(String[] args) throws Exception {
// parse input arguments
final ParameterTool parameterTool = ParameterTool.fromArgs(args);
StreamExecutionEnvironment env = KafkaExampleUtil.prepareExecutionEnv(parameterTool);
DataStream<KafkaEvent> input = env
.addSource(
new FlinkKafkaConsumer010<>(
parameterTool.getRequired("input-topic"),
new KafkaEventSchema(),
parameterTool.getProperties())
.assignTimestampsAndWatermarks(new CustomWatermarkExtractor()))
.keyBy("word")
.map(new RollingAdditionMapper());
input.addSink(
new FlinkKafkaProducer010<>(
parameterTool.getRequired("output-topic"),
new KafkaEventSchema(),
parameterTool.getProperties()));
env.execute("Kafka 0.10 Example");
}
代码示例来源:origin: apache/flink
public static void main(String[] args) throws Exception {
// parse input arguments
final ParameterTool parameterTool = ParameterTool.fromArgs(args);
StreamExecutionEnvironment env = KafkaExampleUtil.prepareExecutionEnv(parameterTool);
DataStream<KafkaEvent> input = env
.addSource(
new FlinkKafkaConsumer011<>(
parameterTool.getRequired("input-topic"),
new KafkaEventSchema(),
parameterTool.getProperties())
.assignTimestampsAndWatermarks(new CustomWatermarkExtractor()))
.keyBy("word")
.map(new RollingAdditionMapper());
input.addSink(
new FlinkKafkaProducer011<>(
parameterTool.getRequired("output-topic"),
new KafkaEventSchema(),
parameterTool.getProperties()));
env.execute("Kafka 0.11 Example");
}
代码示例来源:origin: apache/flink
public static void main(String[] args) throws Exception {
// parse input arguments
final ParameterTool parameterTool = ParameterTool.fromArgs(args);
StreamExecutionEnvironment env = KafkaExampleUtil.prepareExecutionEnv(parameterTool);
DataStream<KafkaEvent> input = env
.addSource(
new FlinkKafkaConsumer<>(
parameterTool.getRequired("input-topic"),
new KafkaEventSchema(),
parameterTool.getProperties())
.assignTimestampsAndWatermarks(new CustomWatermarkExtractor()))
.keyBy("word")
.map(new RollingAdditionMapper());
input.addSink(
new FlinkKafkaProducer<>(
parameterTool.getRequired("output-topic"),
new KeyedSerializationSchemaWrapper<>(new KafkaEventSchema()),
parameterTool.getProperties(),
FlinkKafkaProducer.Semantic.EXACTLY_ONCE));
env.execute("Modern Kafka Example");
}
代码示例来源:origin: apache/flink
/**
* Tests that the global and operator-wide max parallelism setting is respected.
*/
@Test
public void testMaxParallelismForwarding() {
int globalMaxParallelism = 42;
int keyedResult2MaxParallelism = 17;
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setMaxParallelism(globalMaxParallelism);
DataStream<Integer> source = env.fromElements(1, 2, 3);
DataStream<Integer> keyedResult1 = source.keyBy(value -> value).map(new NoOpIntMap());
DataStream<Integer> keyedResult2 = keyedResult1
.keyBy(value -> value)
.map(new NoOpIntMap())
.setMaxParallelism(keyedResult2MaxParallelism);
keyedResult2.addSink(new DiscardingSink<>());
StreamGraph graph = env.getStreamGraph();
StreamNode keyedResult1Node = graph.getStreamNode(keyedResult1.getId());
StreamNode keyedResult2Node = graph.getStreamNode(keyedResult2.getId());
assertEquals(globalMaxParallelism, keyedResult1Node.getMaxParallelism());
assertEquals(keyedResult2MaxParallelism, keyedResult2Node.getMaxParallelism());
}
代码示例来源:origin: apache/flink
.map(new MapFunction<Tuple2<Integer, Integer>, Integer>() {
代码示例来源:origin: apache/flink
.map(new RichMapFunction<Tuple2<String, Integer>, String>() {
private static final long serialVersionUID = 1L;
代码示例来源:origin: apache/flink
/**
* Runs the following program.
* <pre>
* [ (source)->(filter) ]-s->[ (map) ] -> [ (map) ] -> [ (groupBy/count)->(sink) ]
* </pre>
*/
@Override
public void testProgram(StreamExecutionEnvironment env) {
DataStream<String> stream = env.addSource(new StringGeneratingSourceFunction(NUM_STRINGS));
stream
// -------------- first vertex, chained to the source ----------------
.filter(new StringRichFilterFunction())
.shuffle()
// -------------- seconds vertex - the stateful one that also fails ----------------
.map(new StringPrefixCountRichMapFunction())
.startNewChain()
.map(new StatefulCounterFunction())
// -------------- third vertex - counter and the sink ----------------
.keyBy("prefix")
.map(new OnceFailingPrefixCounter(NUM_STRINGS))
.addSink(new SinkFunction<PrefixCount>() {
@Override
public void invoke(PrefixCount value) throws Exception {
// Do nothing here
}
});
}
代码示例来源:origin: apache/flink
.map(new SubtaskIndexAssigner())
.addSink(hashPartitionResultSink);
代码示例来源:origin: apache/flink
.map(new TestMapFunction())
.addSink(new MigrationTestUtils.AccumulatorCountingSink<>());
代码示例来源:origin: intel-hadoop/HiBench
.map(new RichMapFunction<Tuple2<String, Tuple2<String, Integer>>, Tuple2<String, Tuple2<String, Integer>>>() {
private transient ValueState<Integer> sum;
代码示例来源:origin: king/bravo
public DataStream<String> restoreTestPipeline(DataStream<String> source) {
return source
.map(Integer::parseInt)
.returns(Integer.class)
.keyBy(i -> i)
.map(new StatefulCounter2())
.uid("hello").map(Tuple3::toString)
.returns(String.class);
}
内容来源于网络,如有侵权,请联系作者删除!