org.apache.flink.streaming.api.datastream.KeyedStream.reduce()方法的使用及代码示例

x33g5p2x  于2022-01-23 转载在 其他  
字(3.7k)|赞(0)|评价(0)|浏览(188)

本文整理了Java中org.apache.flink.streaming.api.datastream.KeyedStream.reduce()方法的一些代码示例,展示了KeyedStream.reduce()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。KeyedStream.reduce()方法的具体详情如下:
包路径:org.apache.flink.streaming.api.datastream.KeyedStream
类名称:KeyedStream
方法名:reduce

KeyedStream.reduce介绍

[英]Applies a reduce transformation on the grouped data stream grouped on by the given key position. The ReduceFunction will receive input values based on the key value. Only input values with the same key will go to the same reducer.
[中]对按给定键位置分组的分组数据流应用reduce转换。reduce函数将根据键值接收输入值。只有具有相同键的输入值才会进入相同的减速器。

代码示例

代码示例来源:origin: apache/flink

/**
   * A thin wrapper layer over {@link KeyedStream#reduce(ReduceFunction)}.
   *
   * @param reducer The {@link ReduceFunction} that will be called for every
   * element of the input values with the same key.
   * @return The transformed data stream @{link PythonSingleOutputStreamOperator}.
   */
  public PythonSingleOutputStreamOperator reduce(ReduceFunction<PyObject> reducer) throws IOException {
    return new PythonSingleOutputStreamOperator(this.stream.reduce(new PythonReduceFunction(reducer)));
  }
}

代码示例来源:origin: apache/flink

@Test
public void testUserProvidedHashing() {
  StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
  List<String> userHashes = Arrays.asList("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb");
  env.addSource(new NoOpSourceFunction(), "src").setUidHash(userHashes.get(0))
      .map(new NoOpMapFunction())
      .filter(new NoOpFilterFunction())
      .keyBy(new NoOpKeySelector())
      .reduce(new NoOpReduceFunction()).name("reduce").setUidHash(userHashes.get(1));
  StreamGraph streamGraph = env.getStreamGraph();
  int idx = 1;
  for (JobVertex jobVertex : streamGraph.getJobGraph().getVertices()) {
    List<JobVertexID> idAlternatives = jobVertex.getIdAlternatives();
    Assert.assertEquals(idAlternatives.get(idAlternatives.size() - 1).toString(), userHashes.get(idx));
    --idx;
  }
}

代码示例来源:origin: apache/flink

.reduce(new OnceFailingReducer(numElements))

代码示例来源:origin: apache/flink

opMethod.invoke(filter, resource3);
DataStream<Tuple2<Integer, Integer>> reduce = filter.keyBy(0).reduce(new ReduceFunction<Tuple2<Integer, Integer>>() {
  @Override
  public Tuple2<Integer, Integer> reduce(Tuple2<Integer, Integer> value1, Tuple2<Integer, Integer> value2) throws Exception {

代码示例来源:origin: apache/flink

@Test
public void testUserProvidedHashingOnChainSupported() {
  StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
  env.addSource(new NoOpSourceFunction(), "src").setUidHash("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa")
      .map(new NoOpMapFunction()).setUidHash("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb")
      .filter(new NoOpFilterFunction()).setUidHash("cccccccccccccccccccccccccccccccc")
      .keyBy(new NoOpKeySelector())
      .reduce(new NoOpReduceFunction()).name("reduce").setUidHash("dddddddddddddddddddddddddddddddd");
  env.getStreamGraph().getJobGraph();
}

代码示例来源:origin: apache/flink

.reduce(new NoOpReduceFunction())
.addSink(new NoOpSinkFunction())
.name("sink0").uid("sink0");
.reduce(new NoOpReduceFunction())
.addSink(new NoOpSinkFunction())
.name("sink1").uid("sink1");

代码示例来源:origin: apache/flink

.reduce(new ReduceFunction<Tuple2<Integer, Long>>() {
  @Override
  public Tuple2<Integer, Long> reduce(

代码示例来源:origin: apache/flink

.reduce(new OnceFailingReducer(NUM_STRINGS))
.addSink(new SinkFunction<PrefixCount>() {

代码示例来源:origin: apache/flink

.filter(new NoOpFilterFunction())
.keyBy(new NoOpKeySelector())
.reduce(new NoOpReduceFunction()).name("reduce");
.filter(new NoOpFilterFunction())
.keyBy(new NoOpKeySelector())
.reduce(new NoOpReduceFunction()).name("reduce");

相关文章