本文整理了Java中org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator.assignTimestampsAndWatermarks()
方法的一些代码示例,展示了SingleOutputStreamOperator.assignTimestampsAndWatermarks()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。SingleOutputStreamOperator.assignTimestampsAndWatermarks()
方法的具体详情如下:
包路径:org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
类名称:SingleOutputStreamOperator
方法名:assignTimestampsAndWatermarks
暂无
代码示例来源:origin: apache/flink
public static void main(String[] args) throws Exception {
final ParameterTool pt = ParameterTool.fromArgs(args);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
setupEnvironment(env, pt);
KeyedStream<Event, Integer> source = env.addSource(createEventSource(pt))
.name("EventSource")
.uid("EventSource")
.assignTimestampsAndWatermarks(createTimestampExtractor(pt))
.keyBy(Event::getKey);
List<TypeSerializer<ComplexPayload>> stateSer =
Collections.singletonList(new KryoSerializer<>(ComplexPayload.class, env.getConfig()));
KeyedStream<Event, Integer> afterStatefulOperations = isOriginalJobVariant(pt) ?
applyOriginalStatefulOperations(source, stateSer, Collections.emptyList()) :
applyUpgradedStatefulOperations(source, stateSer, Collections.emptyList());
afterStatefulOperations
.flatMap(createSemanticsCheckMapper(pt))
.name("SemanticsCheckMapper")
.addSink(new PrintSinkFunction<>());
env.execute("General purpose test job");
}
代码示例来源:origin: apache/flink
.assignTimestampsAndWatermarks(createTimestampExtractor(pt))
.keyBy(Event::getKey)
.map(createArtificialKeyedStateMapper(
代码示例来源:origin: dataArtisans/flink-dataflow
@Override
public void translateNode(Read.Unbounded<T> transform, FlinkStreamingTranslationContext context) {
PCollection<T> output = context.getOutput(transform);
DataStream<WindowedValue<T>> source;
if (transform.getSource().getClass().equals(UnboundedFlinkSource.class)) {
UnboundedFlinkSource flinkSource = (UnboundedFlinkSource) transform.getSource();
source = context.getExecutionEnvironment()
.addSource(flinkSource.getFlinkSource())
.flatMap(new FlatMapFunction<String, WindowedValue<String>>() {
@Override
public void flatMap(String s, Collector<WindowedValue<String>> collector) throws Exception {
collector.collect(WindowedValue.<String>of(s, Instant.now(), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING));
}
}).assignTimestampsAndWatermarks(new IngestionTimeExtractor());
} else {
source = context.getExecutionEnvironment()
.addSource(new UnboundedSourceWrapper<>(context.getPipelineOptions(), transform));
}
context.setOutputDataStream(output, source);
}
}
代码示例来源:origin: streaming-olap/training
.assignTimestampsAndWatermarks(new BoundedWaterMark(60))
.keyBy(new KeySelector<HashMap<String,String>, Object>() {
@Override
代码示例来源:origin: dataArtisans/oscon
sensorStream = sensorStream.assignTimestampsAndWatermarks(new SensorDataWatermarkAssigner());
代码示例来源:origin: dataArtisans/flink-training-exercises
public static void main(String[] args) throws Exception {
// read parameters
ParameterTool params = ParameterTool.fromArgs(args);
String input = params.getRequired("input");
// set up streaming execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
// connect to the data file
DataStream<String> carData = env.readTextFile(input);
// map to events
DataStream<ConnectedCarEvent> events = carData
.map((String line) -> ConnectedCarEvent.fromString(line))
.assignTimestampsAndWatermarks(new ConnectedCarAssigner());
// sort events
events.keyBy((ConnectedCarEvent event) -> event.carId)
.process(new SortFunction())
.print();
env.execute("Sort Connected Car Events");
}
代码示例来源:origin: dataArtisans/flink-training-exercises
public static void main(String[] args) throws Exception {
// read parameters
ParameterTool params = ParameterTool.fromArgs(args);
String input = params.getRequired("input");
// set up streaming execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// connect to the data file
DataStream<String> carData = env.readTextFile(input);
// find segments
DataStream<ConnectedCarEvent> events = carData
.map((String line) -> ConnectedCarEvent.fromString(line))
.assignTimestampsAndWatermarks(new ConnectedCarAssigner());
events.keyBy("carId")
.window(EventTimeSessionWindows.withGap(Time.seconds(15)))
.apply(new CreateGapSegment())
.print();
env.execute("Driving Sessions");
}
代码示例来源:origin: dataArtisans/flink-training-exercises
public static void main(String[] args) throws Exception {
// read parameters
ParameterTool params = ParameterTool.fromArgs(args);
String input = params.getRequired("input");
// set up streaming execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// connect to the data file
DataStream<String> carData = env.readTextFile(input);
// map to events
DataStream<ConnectedCarEvent> events = carData
.map((String line) -> ConnectedCarEvent.fromString(line))
.assignTimestampsAndWatermarks(new ConnectedCarAssigner());
// find segments
events.keyBy("carId")
.window(GlobalWindows.create())
.trigger(new SegmentingOutOfOrderTrigger())
.evictor(new SegmentingEvictor())
.apply(new CreateStoppedSegment())
.print();
env.execute("Driving Segments");
}
代码示例来源:origin: dataArtisans/flink-training-exercises
.assignTimestampsAndWatermarks(new ConnectedCarAssigner())
.keyBy("carId");
内容来源于网络,如有侵权,请联系作者删除!