org.apache.flink.streaming.api.datastream.KeyedStream.intervalJoin()方法的使用及代码示例

x33g5p2x  于2022-01-23 转载在 其他  
字(7.2k)|赞(0)|评价(0)|浏览(116)

本文整理了Java中org.apache.flink.streaming.api.datastream.KeyedStream.intervalJoin()方法的一些代码示例,展示了KeyedStream.intervalJoin()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。KeyedStream.intervalJoin()方法的具体详情如下:
包路径:org.apache.flink.streaming.api.datastream.KeyedStream
类名称:KeyedStream
方法名:intervalJoin

KeyedStream.intervalJoin介绍

[英]Join elements of this KeyedStream with elements of another KeyedStream over a time interval that can be specified with IntervalJoin#between(Time,Time).
[中]在可以使用IntervalJoin#between(time,time)指定的时间间隔内,将此KeyedStream的元素与另一KeyedStream的元素连接起来。

代码示例

代码示例来源:origin: apache/flink

@Test(expected = NullPointerException.class)
public void testFailsWithoutUpperBound() {
  final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
  env.setParallelism(1);
  DataStream<Tuple2<String, Integer>> streamOne = env.fromElements(Tuple2.of("1", 1));
  DataStream<Tuple2<String, Integer>> streamTwo = env.fromElements(Tuple2.of("1", 1));
  streamOne
    .keyBy(new Tuple2KeyExtractor())
    .intervalJoin(streamTwo.keyBy(new Tuple2KeyExtractor()))
    .between(Time.milliseconds(0), null);
}

代码示例来源:origin: apache/flink

@Test(expected = NullPointerException.class)
public void testFailsWithoutLowerBound() {
  final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
  env.setParallelism(1);
  DataStream<Tuple2<String, Integer>> streamOne = env.fromElements(Tuple2.of("1", 1));
  DataStream<Tuple2<String, Integer>> streamTwo = env.fromElements(Tuple2.of("1", 1));
  streamOne
    .keyBy(new Tuple2KeyExtractor())
    .intervalJoin(streamTwo.keyBy(new Tuple2KeyExtractor()))
    .between(null, Time.milliseconds(1));
}

代码示例来源:origin: apache/flink

@Test(expected = UnsupportedTimeCharacteristicException.class)
public void testExecutionFailsInProcessingTime() throws Exception {
  final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
  env.setParallelism(1);
  DataStream<Tuple2<String, Integer>> streamOne = env.fromElements(Tuple2.of("1", 1));
  DataStream<Tuple2<String, Integer>> streamTwo = env.fromElements(Tuple2.of("1", 1));
  streamOne.keyBy(new Tuple2KeyExtractor())
    .intervalJoin(streamTwo.keyBy(new Tuple2KeyExtractor()))
    .between(Time.milliseconds(0), Time.milliseconds(0))
    .process(new ProcessJoinFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String>() {
      @Override
      public void processElement(Tuple2<String, Integer> left,
        Tuple2<String, Integer> right, Context ctx,
        Collector<String> out) throws Exception {
        out.collect(left + ":" + right);
      }
    });
}

代码示例来源:origin: apache/flink

.intervalJoin(streamTwo.keyBy(new Tuple2KeyExtractor()))
.between(Time.milliseconds(-1), Time.milliseconds(1))
.process(new CombineToStringJoinFunction())

代码示例来源:origin: apache/flink

@Test
public void testBoundsCanBeInclusive() throws Exception {
  final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
  env.setParallelism(1);
  DataStream<Tuple2<String, Integer>> streamOne = env.fromElements(
    Tuple2.of("key", 0),
    Tuple2.of("key", 1),
    Tuple2.of("key", 2)
  ).assignTimestampsAndWatermarks(new AscendingTuple2TimestampExtractor());
  DataStream<Tuple2<String, Integer>> streamTwo = env.fromElements(
    Tuple2.of("key", 0),
    Tuple2.of("key", 1),
    Tuple2.of("key", 2)
  ).assignTimestampsAndWatermarks(new AscendingTuple2TimestampExtractor());
  streamOne.keyBy(new Tuple2KeyExtractor())
    .intervalJoin(streamTwo.keyBy(new Tuple2KeyExtractor()))
    .between(Time.milliseconds(0), Time.milliseconds(2))
    .process(new CombineToStringJoinFunction())
    .addSink(new ResultSink());
  env.execute();
  expectInAnyOrder(
    "(key,0):(key,0)",
    "(key,0):(key,1)",
    "(key,0):(key,2)",
    "(key,1):(key,1)",
    "(key,1):(key,2)",
    "(key,2):(key,2)"
  );
}

代码示例来源:origin: apache/flink

@Test
public void testBoundsAreInclusiveByDefault() throws Exception {
  final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
  env.setParallelism(1);
  DataStream<Tuple2<String, Integer>> streamOne = env.fromElements(
    Tuple2.of("key", 0),
    Tuple2.of("key", 1),
    Tuple2.of("key", 2)
  ).assignTimestampsAndWatermarks(new AscendingTuple2TimestampExtractor());
  DataStream<Tuple2<String, Integer>> streamTwo = env.fromElements(
    Tuple2.of("key", 0),
    Tuple2.of("key", 1),
    Tuple2.of("key", 2)
  ).assignTimestampsAndWatermarks(new AscendingTuple2TimestampExtractor());
  streamOne.keyBy(new Tuple2KeyExtractor())
    .intervalJoin(streamTwo.keyBy(new Tuple2KeyExtractor()))
    .between(Time.milliseconds(0), Time.milliseconds(2))
    .process(new CombineToStringJoinFunction())
    .addSink(new ResultSink());
  env.execute();
  expectInAnyOrder(
    "(key,0):(key,0)",
    "(key,0):(key,1)",
    "(key,0):(key,2)",
    "(key,1):(key,1)",
    "(key,1):(key,2)",
    "(key,2):(key,2)"
  );
}

代码示例来源:origin: apache/flink

@Test
public void testBoundsCanBeExclusive() throws Exception {
  final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
  env.setParallelism(1);
  DataStream<Tuple2<String, Integer>> streamOne = env.fromElements(
    Tuple2.of("key", 0),
    Tuple2.of("key", 1),
    Tuple2.of("key", 2)
  ).assignTimestampsAndWatermarks(new AscendingTuple2TimestampExtractor());
  DataStream<Tuple2<String, Integer>> streamTwo = env.fromElements(
    Tuple2.of("key", 0),
    Tuple2.of("key", 1),
    Tuple2.of("key", 2)
  ).assignTimestampsAndWatermarks(new AscendingTuple2TimestampExtractor());
  streamOne.keyBy(new Tuple2KeyExtractor())
    .intervalJoin(streamTwo.keyBy(new Tuple2KeyExtractor()))
    .between(Time.milliseconds(0), Time.milliseconds(2))
    .upperBoundExclusive()
    .lowerBoundExclusive()
    .process(new CombineToStringJoinFunction())
    .addSink(new ResultSink());
  env.execute();
  expectInAnyOrder(
    "(key,0):(key,1)",
    "(key,1):(key,2)"
  );
}

代码示例来源:origin: apache/flink

.intervalJoin(streamTwo)

代码示例来源:origin: apache/flink

.intervalJoin(streamTwo)
.between(Time.milliseconds(0), Time.milliseconds(0))
.process(new ProcessJoinFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String>() {

代码示例来源:origin: apache/flink

.intervalJoin(source.keyBy((in) -> in))
    .between(Time.milliseconds(10L), Time.milliseconds(10L))
    .process(new TestProcessJoinFunction<>())
source.join(source).where(new TestKeySelector<>(), Types.STRING).equalTo(new TestKeySelector<>(), Types.STRING);
source.keyBy((in) -> in)
  .intervalJoin(source.keyBy((in) -> in))
  .between(Time.milliseconds(10L), Time.milliseconds(10L))
  .process(new TestProcessJoinFunction<Long, Long, String>())
  .returns(Types.STRING);
source.keyBy((in) -> in)
  .intervalJoin(source.keyBy((in) -> in))
  .between(Time.milliseconds(10L), Time.milliseconds(10L))
  .process(new TestProcessJoinFunction<>(), Types.STRING);

相关文章