本文整理了Java中org.apache.flink.streaming.api.datastream.KeyedStream.intervalJoin()
方法的一些代码示例,展示了KeyedStream.intervalJoin()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。KeyedStream.intervalJoin()
方法的具体详情如下:
包路径:org.apache.flink.streaming.api.datastream.KeyedStream
类名称:KeyedStream
方法名:intervalJoin
[英]Join elements of this KeyedStream with elements of another KeyedStream over a time interval that can be specified with IntervalJoin#between(Time,Time).
[中]在可以使用IntervalJoin#between(time,time)指定的时间间隔内,将此KeyedStream的元素与另一KeyedStream的元素连接起来。
代码示例来源:origin: apache/flink
@Test(expected = NullPointerException.class)
public void testFailsWithoutUpperBound() {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
DataStream<Tuple2<String, Integer>> streamOne = env.fromElements(Tuple2.of("1", 1));
DataStream<Tuple2<String, Integer>> streamTwo = env.fromElements(Tuple2.of("1", 1));
streamOne
.keyBy(new Tuple2KeyExtractor())
.intervalJoin(streamTwo.keyBy(new Tuple2KeyExtractor()))
.between(Time.milliseconds(0), null);
}
代码示例来源:origin: apache/flink
@Test(expected = NullPointerException.class)
public void testFailsWithoutLowerBound() {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
DataStream<Tuple2<String, Integer>> streamOne = env.fromElements(Tuple2.of("1", 1));
DataStream<Tuple2<String, Integer>> streamTwo = env.fromElements(Tuple2.of("1", 1));
streamOne
.keyBy(new Tuple2KeyExtractor())
.intervalJoin(streamTwo.keyBy(new Tuple2KeyExtractor()))
.between(null, Time.milliseconds(1));
}
代码示例来源:origin: apache/flink
@Test(expected = UnsupportedTimeCharacteristicException.class)
public void testExecutionFailsInProcessingTime() throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
env.setParallelism(1);
DataStream<Tuple2<String, Integer>> streamOne = env.fromElements(Tuple2.of("1", 1));
DataStream<Tuple2<String, Integer>> streamTwo = env.fromElements(Tuple2.of("1", 1));
streamOne.keyBy(new Tuple2KeyExtractor())
.intervalJoin(streamTwo.keyBy(new Tuple2KeyExtractor()))
.between(Time.milliseconds(0), Time.milliseconds(0))
.process(new ProcessJoinFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String>() {
@Override
public void processElement(Tuple2<String, Integer> left,
Tuple2<String, Integer> right, Context ctx,
Collector<String> out) throws Exception {
out.collect(left + ":" + right);
}
});
}
代码示例来源:origin: apache/flink
.intervalJoin(streamTwo.keyBy(new Tuple2KeyExtractor()))
.between(Time.milliseconds(-1), Time.milliseconds(1))
.process(new CombineToStringJoinFunction())
代码示例来源:origin: apache/flink
@Test
public void testBoundsCanBeInclusive() throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
DataStream<Tuple2<String, Integer>> streamOne = env.fromElements(
Tuple2.of("key", 0),
Tuple2.of("key", 1),
Tuple2.of("key", 2)
).assignTimestampsAndWatermarks(new AscendingTuple2TimestampExtractor());
DataStream<Tuple2<String, Integer>> streamTwo = env.fromElements(
Tuple2.of("key", 0),
Tuple2.of("key", 1),
Tuple2.of("key", 2)
).assignTimestampsAndWatermarks(new AscendingTuple2TimestampExtractor());
streamOne.keyBy(new Tuple2KeyExtractor())
.intervalJoin(streamTwo.keyBy(new Tuple2KeyExtractor()))
.between(Time.milliseconds(0), Time.milliseconds(2))
.process(new CombineToStringJoinFunction())
.addSink(new ResultSink());
env.execute();
expectInAnyOrder(
"(key,0):(key,0)",
"(key,0):(key,1)",
"(key,0):(key,2)",
"(key,1):(key,1)",
"(key,1):(key,2)",
"(key,2):(key,2)"
);
}
代码示例来源:origin: apache/flink
@Test
public void testBoundsAreInclusiveByDefault() throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
DataStream<Tuple2<String, Integer>> streamOne = env.fromElements(
Tuple2.of("key", 0),
Tuple2.of("key", 1),
Tuple2.of("key", 2)
).assignTimestampsAndWatermarks(new AscendingTuple2TimestampExtractor());
DataStream<Tuple2<String, Integer>> streamTwo = env.fromElements(
Tuple2.of("key", 0),
Tuple2.of("key", 1),
Tuple2.of("key", 2)
).assignTimestampsAndWatermarks(new AscendingTuple2TimestampExtractor());
streamOne.keyBy(new Tuple2KeyExtractor())
.intervalJoin(streamTwo.keyBy(new Tuple2KeyExtractor()))
.between(Time.milliseconds(0), Time.milliseconds(2))
.process(new CombineToStringJoinFunction())
.addSink(new ResultSink());
env.execute();
expectInAnyOrder(
"(key,0):(key,0)",
"(key,0):(key,1)",
"(key,0):(key,2)",
"(key,1):(key,1)",
"(key,1):(key,2)",
"(key,2):(key,2)"
);
}
代码示例来源:origin: apache/flink
@Test
public void testBoundsCanBeExclusive() throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
DataStream<Tuple2<String, Integer>> streamOne = env.fromElements(
Tuple2.of("key", 0),
Tuple2.of("key", 1),
Tuple2.of("key", 2)
).assignTimestampsAndWatermarks(new AscendingTuple2TimestampExtractor());
DataStream<Tuple2<String, Integer>> streamTwo = env.fromElements(
Tuple2.of("key", 0),
Tuple2.of("key", 1),
Tuple2.of("key", 2)
).assignTimestampsAndWatermarks(new AscendingTuple2TimestampExtractor());
streamOne.keyBy(new Tuple2KeyExtractor())
.intervalJoin(streamTwo.keyBy(new Tuple2KeyExtractor()))
.between(Time.milliseconds(0), Time.milliseconds(2))
.upperBoundExclusive()
.lowerBoundExclusive()
.process(new CombineToStringJoinFunction())
.addSink(new ResultSink());
env.execute();
expectInAnyOrder(
"(key,0):(key,1)",
"(key,1):(key,2)"
);
}
代码示例来源:origin: apache/flink
.intervalJoin(streamTwo)
代码示例来源:origin: apache/flink
.intervalJoin(streamTwo)
.between(Time.milliseconds(0), Time.milliseconds(0))
.process(new ProcessJoinFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String>() {
代码示例来源:origin: apache/flink
.intervalJoin(source.keyBy((in) -> in))
.between(Time.milliseconds(10L), Time.milliseconds(10L))
.process(new TestProcessJoinFunction<>())
source.join(source).where(new TestKeySelector<>(), Types.STRING).equalTo(new TestKeySelector<>(), Types.STRING);
source.keyBy((in) -> in)
.intervalJoin(source.keyBy((in) -> in))
.between(Time.milliseconds(10L), Time.milliseconds(10L))
.process(new TestProcessJoinFunction<Long, Long, String>())
.returns(Types.STRING);
source.keyBy((in) -> in)
.intervalJoin(source.keyBy((in) -> in))
.between(Time.milliseconds(10L), Time.milliseconds(10L))
.process(new TestProcessJoinFunction<>(), Types.STRING);
内容来源于网络,如有侵权,请联系作者删除!