本文整理了Java中org.apache.flink.streaming.api.datastream.KeyedStream.connect()
方法的一些代码示例,展示了KeyedStream.connect()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。KeyedStream.connect()
方法的具体详情如下:
包路径:org.apache.flink.streaming.api.datastream.KeyedStream
类名称:KeyedStream
方法名:connect
暂无
代码示例来源:origin: apache/flink
.connect(right)
.keyBy(keySelector1, keySelector2)
.transform("Interval Join", outputType, operator);
代码示例来源:origin: apache/flink
.connect(npBroadcastStream)
.process(firstBroadcastFunction).uid("BrProcess1")
.addSink(new MigrationTestUtils.AccumulatorCountingSink<>());
.connect(pBroadcastStream)
.process(secondBroadcastFunction).uid("BrProcess2")
.addSink(new MigrationTestUtils.AccumulatorCountingSink<>());
代码示例来源:origin: apache/flink
.connect(ds2.keyBy(i -> i))
.process(new CoProcessFunction<Integer, Integer, Integer>() {
@Override
代码示例来源:origin: apache/flink
.connect(ds2.keyBy(i -> i))
.process(new CoProcessFunction<Integer, Integer, Integer>() {
@Override
代码示例来源:origin: dataArtisans/flink-training-exercises
.connect(broadcastRulesStream)
.process(new MatchFunction());
代码示例来源:origin: dataArtisans/flink-training-exercises
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// Simulated trade stream
DataStream<Trade> tradeStream = FinSources.tradeSource(env);
// Simulated customer stream
DataStream<Customer> customerStream = FinSources.customerSource(env);
// Stream of enriched trades
DataStream<EnrichedTrade> joinedStream = tradeStream
.keyBy("customerId")
.connect(customerStream.keyBy("customerId"))
.process(new ProcessingTimeJoinFunction());
joinedStream.print();
env.execute("processing-time join");
}
代码示例来源:origin: dataArtisans/flink-training-exercises
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// Simulated trade stream
DataStream<Trade> tradeStream = FinSources.tradeSource(env);
// Simulated customer stream
DataStream<Customer> customerStream = FinSources.customerSource(env);
// Stream of enriched trades
DataStream<EnrichedTrade> joinedStream = tradeStream
.keyBy("customerId")
.connect(customerStream.keyBy("customerId"))
.process(new EventTimeJoinFunction());
joinedStream.print();
env.execute("event-time join");
}
代码示例来源:origin: king/bravo
public DataStream<String> constructTestPipeline(DataStream<String> source) {
OutputTag<Integer> filtered = new OutputTag<>("filter", BasicTypeInfo.INT_TYPE_INFO);
OutputTag<Integer> process = new OutputTag<>("process", BasicTypeInfo.INT_TYPE_INFO);
SingleOutputStreamOperator<String> input = source.process(new ProcessFunction<String, String>() {
private static final long serialVersionUID = 1L;
@Override
public void processElement(String s, Context ctx,
Collector<String> out) throws Exception {
if (s.startsWith("filter ")) {
ctx.output(filtered, Integer.parseInt(s.substring(7)));
} else if (s.startsWith("process ")) {
ctx.output(process, Integer.parseInt(s.substring(8)));
} else {
throw new RuntimeException("oOoO");
}
}
});
BroadcastStream<Integer> broadcast = input.getSideOutput(filtered).broadcast(bcstate);
return input.getSideOutput(process).keyBy(i -> i).connect(broadcast).process(new BroadcastProcessor(bcstate))
.uid("stateful");
}
代码示例来源:origin: org.apache.flink/flink-streaming-java
.connect(right)
.keyBy(keySelector1, keySelector2)
.transform("Interval Join", outputType, operator);
代码示例来源:origin: dataArtisans/flink-training-exercises
public static void main(String[] args) throws Exception {
ParameterTool params = ParameterTool.fromArgs(args);
final String input = params.get("input", ExerciseBase.pathToRideData);
final int maxEventDelay = 60; // events are out of order by at most 60 seconds
final int servingSpeedFactor = 600; // 10 minutes worth of events are served every second
// In this simple case we need a broadcast state descriptor, but aren't going to
// use it to store anything.
final MapStateDescriptor<Long, Long> dummyBroadcastState = new MapStateDescriptor<>(
"dummy",
BasicTypeInfo.LONG_TYPE_INFO,
BasicTypeInfo.LONG_TYPE_INFO
);
// set up streaming execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(ExerciseBase.parallelism);
DataStream<TaxiRide> rides = env.addSource(new TaxiRideSource(input, maxEventDelay, servingSpeedFactor));
// add a socket source
BroadcastStream<String> queryStream = env.socketTextStream("localhost", 9999)
// EXERCISE QUESTION: Is this needed?
// .assignTimestampsAndWatermarks(new QueryStreamAssigner())
.broadcast(dummyBroadcastState);
DataStream<TaxiRide> reports = rides
.keyBy((TaxiRide ride) -> ride.taxiId)
.connect(queryStream)
.process(new QueryFunction());
printOrTest(reports);
env.execute("Ongoing Rides");
}
代码示例来源:origin: org.apache.flink/flink-streaming-java_2.11
.connect(right)
.keyBy(keySelector1, keySelector2)
.transform("Interval Join", outputType, operator);
代码示例来源:origin: dataArtisans/flink-training-exercises
public static void main(String[] args) throws Exception {
ParameterTool params = ParameterTool.fromArgs(args);
final String input = params.get("input", ExerciseBase.pathToRideData);
final int maxEventDelay = 60; // events are out of order by at most 60 seconds
final int servingSpeedFactor = 1800; // 30 minutes worth of events are served every second
// set up streaming execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(ExerciseBase.parallelism);
// setup a stream of taxi rides
DataStream<TaxiRide> rides = env.addSource(rideSourceOrTest(new TaxiRideSource(input, maxEventDelay, servingSpeedFactor)));
// add a socket source for the query stream
BroadcastStream<String> queryStream = env
.addSource(stringSourceOrTest(new SocketTextStreamFunction("localhost", 9999, "\n", -1)))
.broadcast(queryDescriptor);
// connect the two streams and process queries
DataStream<Tuple2<String, String>> results = rides
.keyBy((TaxiRide ride) -> ride.taxiId)
.connect(queryStream)
.process(new QueryProcessor());
printOrTest(results);
env.execute("Taxi Query");
}
代码示例来源:origin: dataArtisans/flink-training-exercises
public static void main(String[] args) throws Exception {
ParameterTool params = ParameterTool.fromArgs(args);
final String input = params.get("input", ExerciseBase.pathToRideData);
final int maxEventDelay = 60; // events are out of order by at most 60 seconds
final int servingSpeedFactor = 600; // 10 minutes worth of events are served every second
// In this simple case we need a broadcast state descriptor, but aren't going to
// use it to store anything.
final MapStateDescriptor<Long, Long> dummyBroadcastState = new MapStateDescriptor<>(
"dummy",
BasicTypeInfo.LONG_TYPE_INFO,
BasicTypeInfo.LONG_TYPE_INFO
);
// set up streaming execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(ExerciseBase.parallelism);
DataStream<TaxiRide> rides = env.addSource(new TaxiRideSource(input, maxEventDelay, servingSpeedFactor));
// add a socket source
BroadcastStream<String> queryStream = env.socketTextStream("localhost", 9999)
.assignTimestampsAndWatermarks(new QueryStreamAssigner())
.broadcast(dummyBroadcastState);
DataStream<TaxiRide> reports = rides
.keyBy((TaxiRide ride) -> ride.taxiId)
.connect(queryStream)
.process(new QueryFunction());
printOrTest(reports);
env.execute("Ongoing Rides");
}
代码示例来源:origin: dataArtisans/flink-training-exercises
public static void main(String[] args) throws Exception {
ParameterTool params = ParameterTool.fromArgs(args);
final String input = params.get("input", ExerciseBase.pathToRideData);
final int maxEventDelay = 60; // events are out of order by at most 60 seconds
final int servingSpeedFactor = 1800; // 30 minutes worth of events are served every second
// set up streaming execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(ExerciseBase.parallelism);
// setup a stream of taxi rides
DataStream<TaxiRide> rides = env.addSource(rideSourceOrTest(new TaxiRideSource(input, maxEventDelay, servingSpeedFactor)));
// add a socket source for the query stream
BroadcastStream<String> queryStream = env
.addSource(stringSourceOrTest(new SocketTextStreamFunction("localhost", 9999, "\n", -1)))
.assignTimestampsAndWatermarks(new QueryStreamAssigner())
.broadcast(queryDescriptor);
// connect the two streams and process queries
DataStream<Tuple2<String, String>> results = rides
.keyBy((TaxiRide ride) -> ride.taxiId)
.connect(queryStream)
.process(new QueryProcessor());
printOrTest(results);
env.execute("Taxi Query");
}
代码示例来源:origin: dataArtisans/flink-training-exercises
.connect(queryStream)
.process(new QueryFunction());
代码示例来源:origin: dataArtisans/flink-training-exercises
.connect(queryStream)
.process(new QueryFunction());
代码示例来源:origin: dataArtisans/flink-training-exercises
.connect(queryStream)
.process(new QueryFunction());
代码示例来源:origin: dataArtisans/oscon
.connect(controlStream)
.flatMap(new AmplifierFunction())
.addSink(new InfluxDBSink<>("amplifiedSensors"));
内容来源于网络,如有侵权,请联系作者删除!