org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator.connect()方法的使用及代码示例

x33g5p2x  于2022-01-30 转载在 其他  
字(3.4k)|赞(0)|评价(0)|浏览(122)

本文整理了Java中org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator.connect()方法的一些代码示例,展示了SingleOutputStreamOperator.connect()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。SingleOutputStreamOperator.connect()方法的具体详情如下:
包路径:org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
类名称:SingleOutputStreamOperator
方法名:connect

SingleOutputStreamOperator.connect介绍

暂无

代码示例

代码示例来源:origin: apache/flink

.connect(timedOutStream)
.map(new CoMapTimeout<>())
.returns(outTypeInfo);

代码示例来源:origin: apache/flink

.connect(timedOutStream)
.map(new CoMapTimeout<>())
.returns(outTypeInfo);

代码示例来源:origin: apache/flink

.connect(source2).map(new IdentityCoMap())
.transform("Custom Operator", BasicTypeInfo.INT_TYPE_INFO, new CustomOperator(true))
.addSink(new DiscardingSink<Integer>());

代码示例来源:origin: apache/flink

.connect(stream).flatMap(new LeftIdentityCoRichFlatMapFunction())

代码示例来源:origin: apache/flink

.connect(source2).map(new IdentityCoMap())
.transform("Custom Operator", BasicTypeInfo.INT_TYPE_INFO, new CustomOperator(true))
.addSink(new DiscardingSink<Integer>());

代码示例来源:origin: apache/flink

/**
 * These check whether timestamps are properly assigned at the sources and handled in
 * network transmission and between chained operators when timestamps are enabled.
 */
@Test
public void testTimestampHandling() throws Exception {
  final int numElements = 10;
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
  env.setParallelism(PARALLELISM);
  env.getConfig().disableSysoutLogging();
  DataStream<Integer> source1 = env.addSource(new MyTimestampSource(0L, numElements));
  DataStream<Integer> source2 = env.addSource(new MyTimestampSource(0L, numElements));
  source1
      .map(new IdentityMap())
      .connect(source2).map(new IdentityCoMap())
      .transform("Custom Operator", BasicTypeInfo.INT_TYPE_INFO, new TimestampCheckingOperator())
      .addSink(new DiscardingSink<Integer>());
  env.execute();
}

代码示例来源:origin: apache/flink

/**
 * These check whether timestamps are properly ignored when they are disabled.
 */
@Test
public void testDisabledTimestamps() throws Exception {
  final int numElements = 10;
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
  env.setParallelism(PARALLELISM);
  env.getConfig().disableSysoutLogging();
  DataStream<Integer> source1 = env.addSource(new MyNonWatermarkingSource(numElements));
  DataStream<Integer> source2 = env.addSource(new MyNonWatermarkingSource(numElements));
  source1
      .map(new IdentityMap())
      .connect(source2).map(new IdentityCoMap())
      .transform("Custom Operator", BasicTypeInfo.INT_TYPE_INFO, new DisabledTimestampCheckingOperator())
      .addSink(new DiscardingSink<Integer>());
  env.execute();
}

代码示例来源:origin: apache/flink

.connect(stream).flatMap(new LeftIdentityCoRichFlatMapFunction())

代码示例来源:origin: org.apache.flink/flink-cep

return mainStream.connect(timedOutStream).map(new CoMapTimeout<>()).returns(outTypeInfo);

代码示例来源:origin: org.apache.flink/flink-cep_2.11

return mainStream.connect(timedOutStream).map(new CoMapTimeout<>()).returns(outTypeInfo);

代码示例来源:origin: org.apache.flink/flink-cep_2.11

return mainStream.connect(timedOutStream).map(new CoMapTimeout<>()).returns(outTypeInfo);

代码示例来源:origin: org.apache.flink/flink-cep

return mainStream.connect(timedOutStream).map(new CoMapTimeout<>()).returns(outTypeInfo);

相关文章

微信公众号

最新文章

更多