org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator.broadcast()方法的使用及代码示例

x33g5p2x  于2022-01-30 转载在 其他  
字(4.6k)|赞(0)|评价(0)|浏览(114)

本文整理了Java中org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator.broadcast()方法的一些代码示例,展示了SingleOutputStreamOperator.broadcast()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。SingleOutputStreamOperator.broadcast()方法的具体详情如下:
包路径:org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
类名称:SingleOutputStreamOperator
方法名:broadcast

SingleOutputStreamOperator.broadcast介绍

暂无

代码示例

代码示例来源:origin: apache/flink

.broadcast(firstBroadcastStateDesc, secondBroadcastStateDesc);
.broadcast(thirdBroadcastStateDesc);

代码示例来源:origin: apache/flink

head1.rebalance().map(noOpIntMap).broadcast(), head2.shuffle()));

代码示例来源:origin: apache/flink

head1.map(noOpIntMap).name("bc").broadcast(),
head2.map(noOpIntMap).shuffle()));

代码示例来源:origin: dataArtisans/flink-training-exercises

.broadcast(rulesStateDescriptor);

代码示例来源:origin: haoch/flink-siddhi

/**
 * Siddhi Continuous Query Language (CQL)
 *
 * @return ExecutionSiddhiStream context
 */
public ExecutionSiddhiStream cql(DataStream<ControlEvent> controlStream) {
  DataStream<Tuple2<StreamRoute, Object>> unionStream = controlStream
    .map(new NamedControlStream(ControlEvent.DEFAULT_INTERNAL_CONTROL_STREAM))
    .broadcast()
    .union(this.toDataStream())
    .transform("add route transform",
      SiddhiTypeFactory.getStreamTupleTypeInformation(TypeInformation.of(Object.class)),
      new AddRouteOperator(getCepEnvironment().getDataStreamSchemas()));
  DataStream<Tuple2<StreamRoute, Object>> partitionedStream = new DataStream<>(
    unionStream.getExecutionEnvironment(),
    new PartitionTransformation<>(unionStream.getTransformation(),
    new DynamicPartitioner()));
  return new ExecutionSiddhiStream(partitionedStream, null, getCepEnvironment());
}

代码示例来源:origin: dataArtisans/flink-training-exercises

public static void main(String[] args) throws Exception {
  ParameterTool params = ParameterTool.fromArgs(args);
  final String input = params.get("input", ExerciseBase.pathToRideData);
  final int maxEventDelay = 60;       	// events are out of order by at most 60 seconds
  final int servingSpeedFactor = 600; 	// 10 minutes worth of events are served every second
  // In this simple case we need a broadcast state descriptor, but aren't going to
  // use it to store anything.
  final MapStateDescriptor<Long, Long> dummyBroadcastState = new MapStateDescriptor<>(
      "dummy",
      BasicTypeInfo.LONG_TYPE_INFO,
      BasicTypeInfo.LONG_TYPE_INFO
  );
  // set up streaming execution environment
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
  env.setParallelism(ExerciseBase.parallelism);
  DataStream<TaxiRide> rides = env.addSource(new TaxiRideSource(input, maxEventDelay, servingSpeedFactor));
  // add a socket source
  BroadcastStream<String> queryStream = env.socketTextStream("localhost", 9999)
      .assignTimestampsAndWatermarks(new QueryStreamAssigner())
      .broadcast(dummyBroadcastState);
  DataStream<TaxiRide> reports = rides
      .keyBy((TaxiRide ride) -> ride.taxiId)
      .connect(queryStream)
      .process(new QueryFunction());
  printOrTest(reports);
  env.execute("Ongoing Rides");
}

代码示例来源:origin: dataArtisans/flink-training-exercises

public static void main(String[] args) throws Exception {
  ParameterTool params = ParameterTool.fromArgs(args);
  final String input = params.get("input", ExerciseBase.pathToRideData);
  final int maxEventDelay = 60;       	// events are out of order by at most 60 seconds
  final int servingSpeedFactor = 1800; 	// 30 minutes worth of events are served every second
  // set up streaming execution environment
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
  env.setParallelism(ExerciseBase.parallelism);
  // setup a stream of taxi rides
  DataStream<TaxiRide> rides = env.addSource(rideSourceOrTest(new TaxiRideSource(input, maxEventDelay, servingSpeedFactor)));
  // add a socket source for the query stream
  BroadcastStream<String> queryStream = env
      .addSource(stringSourceOrTest(new SocketTextStreamFunction("localhost", 9999, "\n", -1)))
      .assignTimestampsAndWatermarks(new QueryStreamAssigner())
      .broadcast(queryDescriptor);
  // connect the two streams and process queries
  DataStream<Tuple2<String, String>> results = rides
      .keyBy((TaxiRide ride) -> ride.taxiId)
      .connect(queryStream)
      .process(new QueryProcessor());
  printOrTest(results);
  env.execute("Taxi Query");
}

代码示例来源:origin: dataArtisans/flink-training-exercises

.broadcast(queryDescriptor);

代码示例来源:origin: dataArtisans/flink-training-exercises

.broadcast(queryDescriptor);

代码示例来源:origin: dataArtisans/flink-training-exercises

.broadcast(queryDescriptor);

代码示例来源:origin: com.alibaba.blink/flink-examples-streaming

broadcast().
connect(orders);

相关文章

微信公众号

最新文章

更多