org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator.assignTimestampsAndWatermarks()方法的使用及代码示例

x33g5p2x  于2022-01-30 转载在 其他  
字(5.7k)|赞(0)|评价(0)|浏览(146)

本文整理了Java中org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator.assignTimestampsAndWatermarks()方法的一些代码示例,展示了SingleOutputStreamOperator.assignTimestampsAndWatermarks()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。SingleOutputStreamOperator.assignTimestampsAndWatermarks()方法的具体详情如下:
包路径:org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
类名称:SingleOutputStreamOperator
方法名:assignTimestampsAndWatermarks

SingleOutputStreamOperator.assignTimestampsAndWatermarks介绍

暂无

代码示例

代码示例来源:origin: apache/flink

public static void main(String[] args) throws Exception {
  final ParameterTool pt = ParameterTool.fromArgs(args);
  final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  setupEnvironment(env, pt);
  KeyedStream<Event, Integer> source = env.addSource(createEventSource(pt))
    .name("EventSource")
    .uid("EventSource")
    .assignTimestampsAndWatermarks(createTimestampExtractor(pt))
    .keyBy(Event::getKey);
  List<TypeSerializer<ComplexPayload>> stateSer =
    Collections.singletonList(new KryoSerializer<>(ComplexPayload.class, env.getConfig()));
  KeyedStream<Event, Integer> afterStatefulOperations = isOriginalJobVariant(pt) ?
    applyOriginalStatefulOperations(source, stateSer, Collections.emptyList()) :
    applyUpgradedStatefulOperations(source, stateSer, Collections.emptyList());
  afterStatefulOperations
    .flatMap(createSemanticsCheckMapper(pt))
    .name("SemanticsCheckMapper")
    .addSink(new PrintSinkFunction<>());
  env.execute("General purpose test job");
}

代码示例来源:origin: apache/flink

.assignTimestampsAndWatermarks(createTimestampExtractor(pt))
.keyBy(Event::getKey)
.map(createArtificialKeyedStateMapper(

代码示例来源:origin: dataArtisans/flink-dataflow

@Override
  public void translateNode(Read.Unbounded<T> transform, FlinkStreamingTranslationContext context) {
    PCollection<T> output = context.getOutput(transform);
    DataStream<WindowedValue<T>> source;
    if (transform.getSource().getClass().equals(UnboundedFlinkSource.class)) {
      UnboundedFlinkSource flinkSource = (UnboundedFlinkSource) transform.getSource();
      source = context.getExecutionEnvironment()
          .addSource(flinkSource.getFlinkSource())
          .flatMap(new FlatMapFunction<String, WindowedValue<String>>() {
            @Override
            public void flatMap(String s, Collector<WindowedValue<String>> collector) throws Exception {
              collector.collect(WindowedValue.<String>of(s, Instant.now(), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING));
            }
          }).assignTimestampsAndWatermarks(new IngestionTimeExtractor());
    } else {
      source = context.getExecutionEnvironment()
          .addSource(new UnboundedSourceWrapper<>(context.getPipelineOptions(), transform));
    }
    context.setOutputDataStream(output, source);
  }
}

代码示例来源:origin: streaming-olap/training

.assignTimestampsAndWatermarks(new BoundedWaterMark(60))
.keyBy(new KeySelector<HashMap<String,String>, Object>() {
  @Override

代码示例来源:origin: dataArtisans/oscon

sensorStream = sensorStream.assignTimestampsAndWatermarks(new SensorDataWatermarkAssigner());

代码示例来源:origin: dataArtisans/flink-training-exercises

public static void main(String[] args) throws Exception {
  // read parameters
  ParameterTool params = ParameterTool.fromArgs(args);
  String input = params.getRequired("input");
  // set up streaming execution environment
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
  env.setParallelism(1);
  // connect to the data file
  DataStream<String> carData = env.readTextFile(input);
  // map to events
  DataStream<ConnectedCarEvent> events = carData
      .map((String line) -> ConnectedCarEvent.fromString(line))
      .assignTimestampsAndWatermarks(new ConnectedCarAssigner());
  // sort events
  events.keyBy((ConnectedCarEvent event) -> event.carId)
      .process(new SortFunction())
      .print();
  env.execute("Sort Connected Car Events");
}

代码示例来源:origin: dataArtisans/flink-training-exercises

public static void main(String[] args) throws Exception {
  // read parameters
  ParameterTool params = ParameterTool.fromArgs(args);
  String input = params.getRequired("input");
  // set up streaming execution environment
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
  // connect to the data file
  DataStream<String> carData = env.readTextFile(input);
  // find segments
  DataStream<ConnectedCarEvent> events = carData
      .map((String line) -> ConnectedCarEvent.fromString(line))
      .assignTimestampsAndWatermarks(new ConnectedCarAssigner());
  events.keyBy("carId")
      .window(EventTimeSessionWindows.withGap(Time.seconds(15)))
      .apply(new CreateGapSegment())
      .print();
  env.execute("Driving Sessions");
}

代码示例来源:origin: dataArtisans/flink-training-exercises

public static void main(String[] args) throws Exception {
  // read parameters
  ParameterTool params = ParameterTool.fromArgs(args);
  String input = params.getRequired("input");
  // set up streaming execution environment
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
  // connect to the data file
  DataStream<String> carData = env.readTextFile(input);
  // map to events
  DataStream<ConnectedCarEvent> events = carData
      .map((String line) -> ConnectedCarEvent.fromString(line))
      .assignTimestampsAndWatermarks(new ConnectedCarAssigner());
  // find segments
  events.keyBy("carId")
      .window(GlobalWindows.create())
      .trigger(new SegmentingOutOfOrderTrigger())
      .evictor(new SegmentingEvictor())
      .apply(new CreateStoppedSegment())
      .print();
  env.execute("Driving Segments");
}

代码示例来源:origin: dataArtisans/flink-training-exercises

.assignTimestampsAndWatermarks(new ConnectedCarAssigner())
.keyBy("carId");

相关文章

微信公众号

最新文章

更多