org.apache.flink.streaming.api.datastream.KeyedStream.connect()方法的使用及代码示例

x33g5p2x  于2022-01-23 转载在 其他  
字(9.5k)|赞(0)|评价(0)|浏览(213)

本文整理了Java中org.apache.flink.streaming.api.datastream.KeyedStream.connect()方法的一些代码示例,展示了KeyedStream.connect()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。KeyedStream.connect()方法的具体详情如下:
包路径:org.apache.flink.streaming.api.datastream.KeyedStream
类名称:KeyedStream
方法名:connect

KeyedStream.connect介绍

暂无

代码示例

代码示例来源:origin: apache/flink

.connect(right)
.keyBy(keySelector1, keySelector2)
.transform("Interval Join", outputType, operator);

代码示例来源:origin: apache/flink

.connect(npBroadcastStream)
.process(firstBroadcastFunction).uid("BrProcess1")
.addSink(new MigrationTestUtils.AccumulatorCountingSink<>());
.connect(pBroadcastStream)
.process(secondBroadcastFunction).uid("BrProcess2")
.addSink(new MigrationTestUtils.AccumulatorCountingSink<>());

代码示例来源:origin: apache/flink

.connect(ds2.keyBy(i -> i))
.process(new CoProcessFunction<Integer, Integer, Integer>() {
  @Override

代码示例来源:origin: apache/flink

.connect(ds2.keyBy(i -> i))
.process(new CoProcessFunction<Integer, Integer, Integer>() {
  @Override

代码示例来源:origin: dataArtisans/flink-training-exercises

.connect(broadcastRulesStream)
.process(new MatchFunction());

代码示例来源:origin: dataArtisans/flink-training-exercises

public static void main(String[] args) throws Exception {
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
  // Simulated trade stream
  DataStream<Trade> tradeStream = FinSources.tradeSource(env);
  // Simulated customer stream
  DataStream<Customer> customerStream = FinSources.customerSource(env);
  // Stream of enriched trades
  DataStream<EnrichedTrade> joinedStream = tradeStream
      .keyBy("customerId")
      .connect(customerStream.keyBy("customerId"))
      .process(new ProcessingTimeJoinFunction());
  joinedStream.print();
  env.execute("processing-time join");
}

代码示例来源:origin: dataArtisans/flink-training-exercises

public static void main(String[] args) throws Exception {
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
  // Simulated trade stream
  DataStream<Trade> tradeStream = FinSources.tradeSource(env);
  // Simulated customer stream
  DataStream<Customer> customerStream = FinSources.customerSource(env);
  // Stream of enriched trades
  DataStream<EnrichedTrade> joinedStream = tradeStream
      .keyBy("customerId")
      .connect(customerStream.keyBy("customerId"))
      .process(new EventTimeJoinFunction());
  joinedStream.print();
  env.execute("event-time join");
}

代码示例来源:origin: king/bravo

public DataStream<String> constructTestPipeline(DataStream<String> source) {
  OutputTag<Integer> filtered = new OutputTag<>("filter", BasicTypeInfo.INT_TYPE_INFO);
  OutputTag<Integer> process = new OutputTag<>("process", BasicTypeInfo.INT_TYPE_INFO);
  SingleOutputStreamOperator<String> input = source.process(new ProcessFunction<String, String>() {
    private static final long serialVersionUID = 1L;
    @Override
    public void processElement(String s, Context ctx,
        Collector<String> out) throws Exception {
      if (s.startsWith("filter ")) {
        ctx.output(filtered, Integer.parseInt(s.substring(7)));
      } else if (s.startsWith("process ")) {
        ctx.output(process, Integer.parseInt(s.substring(8)));
      } else {
        throw new RuntimeException("oOoO");
      }
    }
  });
  BroadcastStream<Integer> broadcast = input.getSideOutput(filtered).broadcast(bcstate);
  return input.getSideOutput(process).keyBy(i -> i).connect(broadcast).process(new BroadcastProcessor(bcstate))
      .uid("stateful");
}

代码示例来源:origin: org.apache.flink/flink-streaming-java

.connect(right)
.keyBy(keySelector1, keySelector2)
.transform("Interval Join", outputType, operator);

代码示例来源:origin: dataArtisans/flink-training-exercises

public static void main(String[] args) throws Exception {
  ParameterTool params = ParameterTool.fromArgs(args);
  final String input = params.get("input", ExerciseBase.pathToRideData);
  final int maxEventDelay = 60;       	// events are out of order by at most 60 seconds
  final int servingSpeedFactor = 600; 	// 10 minutes worth of events are served every second
  // In this simple case we need a broadcast state descriptor, but aren't going to
  // use it to store anything.
  final MapStateDescriptor<Long, Long> dummyBroadcastState = new MapStateDescriptor<>(
      "dummy",
      BasicTypeInfo.LONG_TYPE_INFO,
      BasicTypeInfo.LONG_TYPE_INFO
  );
  // set up streaming execution environment
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
  env.setParallelism(ExerciseBase.parallelism);
  DataStream<TaxiRide> rides = env.addSource(new TaxiRideSource(input, maxEventDelay, servingSpeedFactor));
  // add a socket source
  BroadcastStream<String> queryStream = env.socketTextStream("localhost", 9999)
      // EXERCISE QUESTION: Is this needed?
      // .assignTimestampsAndWatermarks(new QueryStreamAssigner())
      .broadcast(dummyBroadcastState);
  DataStream<TaxiRide> reports = rides
      .keyBy((TaxiRide ride) -> ride.taxiId)
      .connect(queryStream)
      .process(new QueryFunction());
  printOrTest(reports);
  env.execute("Ongoing Rides");
}

代码示例来源:origin: org.apache.flink/flink-streaming-java_2.11

.connect(right)
.keyBy(keySelector1, keySelector2)
.transform("Interval Join", outputType, operator);

代码示例来源:origin: dataArtisans/flink-training-exercises

public static void main(String[] args) throws Exception {
  ParameterTool params = ParameterTool.fromArgs(args);
  final String input = params.get("input", ExerciseBase.pathToRideData);
  final int maxEventDelay = 60;       	// events are out of order by at most 60 seconds
  final int servingSpeedFactor = 1800; 	// 30 minutes worth of events are served every second
  // set up streaming execution environment
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
  env.setParallelism(ExerciseBase.parallelism);
  // setup a stream of taxi rides
  DataStream<TaxiRide> rides = env.addSource(rideSourceOrTest(new TaxiRideSource(input, maxEventDelay, servingSpeedFactor)));
  // add a socket source for the query stream
  BroadcastStream<String> queryStream = env
      .addSource(stringSourceOrTest(new SocketTextStreamFunction("localhost", 9999, "\n", -1)))
      .broadcast(queryDescriptor);
  // connect the two streams and process queries
  DataStream<Tuple2<String, String>> results = rides
      .keyBy((TaxiRide ride) -> ride.taxiId)
      .connect(queryStream)
      .process(new QueryProcessor());
  printOrTest(results);
  env.execute("Taxi Query");
}

代码示例来源:origin: dataArtisans/flink-training-exercises

public static void main(String[] args) throws Exception {
  ParameterTool params = ParameterTool.fromArgs(args);
  final String input = params.get("input", ExerciseBase.pathToRideData);
  final int maxEventDelay = 60;       	// events are out of order by at most 60 seconds
  final int servingSpeedFactor = 600; 	// 10 minutes worth of events are served every second
  // In this simple case we need a broadcast state descriptor, but aren't going to
  // use it to store anything.
  final MapStateDescriptor<Long, Long> dummyBroadcastState = new MapStateDescriptor<>(
      "dummy",
      BasicTypeInfo.LONG_TYPE_INFO,
      BasicTypeInfo.LONG_TYPE_INFO
  );
  // set up streaming execution environment
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
  env.setParallelism(ExerciseBase.parallelism);
  DataStream<TaxiRide> rides = env.addSource(new TaxiRideSource(input, maxEventDelay, servingSpeedFactor));
  // add a socket source
  BroadcastStream<String> queryStream = env.socketTextStream("localhost", 9999)
      .assignTimestampsAndWatermarks(new QueryStreamAssigner())
      .broadcast(dummyBroadcastState);
  DataStream<TaxiRide> reports = rides
      .keyBy((TaxiRide ride) -> ride.taxiId)
      .connect(queryStream)
      .process(new QueryFunction());
  printOrTest(reports);
  env.execute("Ongoing Rides");
}

代码示例来源:origin: dataArtisans/flink-training-exercises

public static void main(String[] args) throws Exception {
  ParameterTool params = ParameterTool.fromArgs(args);
  final String input = params.get("input", ExerciseBase.pathToRideData);
  final int maxEventDelay = 60;       	// events are out of order by at most 60 seconds
  final int servingSpeedFactor = 1800; 	// 30 minutes worth of events are served every second
  // set up streaming execution environment
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
  env.setParallelism(ExerciseBase.parallelism);
  // setup a stream of taxi rides
  DataStream<TaxiRide> rides = env.addSource(rideSourceOrTest(new TaxiRideSource(input, maxEventDelay, servingSpeedFactor)));
  // add a socket source for the query stream
  BroadcastStream<String> queryStream = env
      .addSource(stringSourceOrTest(new SocketTextStreamFunction("localhost", 9999, "\n", -1)))
      .assignTimestampsAndWatermarks(new QueryStreamAssigner())
      .broadcast(queryDescriptor);
  // connect the two streams and process queries
  DataStream<Tuple2<String, String>> results = rides
      .keyBy((TaxiRide ride) -> ride.taxiId)
      .connect(queryStream)
      .process(new QueryProcessor());
  printOrTest(results);
  env.execute("Taxi Query");
}

代码示例来源:origin: dataArtisans/flink-training-exercises

.connect(queryStream)
.process(new QueryFunction());

代码示例来源:origin: dataArtisans/flink-training-exercises

.connect(queryStream)
.process(new QueryFunction());

代码示例来源:origin: dataArtisans/flink-training-exercises

.connect(queryStream)
.process(new QueryFunction());

代码示例来源:origin: dataArtisans/oscon

.connect(controlStream)
.flatMap(new AmplifierFunction())
.addSink(new InfluxDBSink<>("amplifiedSensors"));

相关文章