org.apache.flink.streaming.api.datastream.KeyedStream.sum()方法的使用及代码示例

x33g5p2x  于2022-01-23 转载在 其他  
字(5.5k)|赞(0)|评价(0)|浏览(148)

本文整理了Java中org.apache.flink.streaming.api.datastream.KeyedStream.sum()方法的一些代码示例,展示了KeyedStream.sum()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。KeyedStream.sum()方法的具体详情如下:
包路径:org.apache.flink.streaming.api.datastream.KeyedStream
类名称:KeyedStream
方法名:sum

KeyedStream.sum介绍

[英]Applies an aggregation that gives a rolling sum of the data stream at the given position grouped by the given key. An independent aggregate is kept per key.
[中]应用一个聚合,该聚合在给定位置提供由给定键分组的数据流的滚动和。每个键保留一个独立的聚合。

代码示例

代码示例来源:origin: apache/flink

out.collect(new Tuple2<>(value, 1));
}).keyBy(0).sum(1).print();

代码示例来源:origin: apache/flink

@Test(expected = CompositeType.InvalidFieldReferenceException.class)
public void testFailOnNestedPojoFieldAccessor() throws Exception {
  StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
  DataStream<Data> dataStream = see.fromCollection(elements);
  dataStream.keyBy("aaa", "stats.count").sum("stats.nonExistingField");
}

代码示例来源:origin: apache/flink

.keyBy(0).sum(1);

代码示例来源:origin: apache/flink

.keyBy(0).sum(1);

代码示例来源:origin: apache/flink

@Test
public void testProgram() throws Exception {
  String resultPath = getTempDirPath("result");
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  DataStream<String> text = env.fromElements(WordCountData.TEXT);
  DataStream<Tuple2<String, Integer>> counts = text
      .flatMap(new Tokenizer())
      .keyBy(0).sum(1);
  counts.writeAsCsv(resultPath);
  env.execute("WriteAsCsvTest");
  //Strip the parentheses from the expected text like output
  compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES
      .replaceAll("[\\\\(\\\\)]", ""), resultPath);
}

代码示例来源:origin: apache/flink

@Test
public void testProgram() throws Exception {
  String resultPath = getTempDirPath("result");
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  DataStream<String> text = env.fromElements(WordCountData.TEXT);
  DataStream<Tuple2<String, Integer>> counts = text
      .flatMap(new Tokenizer())
      .keyBy(0).sum(1);
  counts.writeAsText(resultPath);
  env.execute("WriteAsTextTest");
  compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, resultPath);
}

代码示例来源:origin: apache/flink

@Test
public void testNettyEpoll() throws Exception {
  MiniClusterWithClientResource cluster = trySetUpCluster();
  try {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(NUM_TASK_MANAGERS);
    env.getConfig().disableSysoutLogging();
    DataStream<Integer> input = env.fromElements(1, 2, 3, 4, 1, 2, 3, 42);
    input.keyBy(new KeySelector<Integer, Integer>() {
        @Override
        public Integer getKey(Integer value) throws Exception {
          return value;
        }
      })
      .sum(0)
      .print();
    env.execute();
  }
  finally {
    cluster.after();
  }
}

代码示例来源:origin: apache/flink

@Test
public void testNestedPojoFieldAccessor() throws Exception {
  StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
  see.getConfig().disableObjectReuse();
  see.setParallelism(4);
  DataStream<Data> dataStream = see.fromCollection(elements);
  DataStream<Data> summedStream = dataStream
    .keyBy("aaa")
    .sum("stats.count")
    .keyBy("aaa")
    .flatMap(new FlatMapFunction<Data, Data>() {
      Data[] first = new Data[3];
      @Override
      public void flatMap(Data value, Collector<Data> out) throws Exception {
        if (first[value.aaa] == null) {
          first[value.aaa] = value;
          if (value.stats.count != 123) {
            throw new RuntimeException("Expected stats.count to be 123");
          }
        } else {
          if (value.stats.count != 2 * 123) {
            throw new RuntimeException("Expected stats.count to be 2 * 123");
          }
        }
      }
    });
  summedStream.print();
  see.execute();
}

代码示例来源:origin: apache/flink

.sum("sum")
.keyBy("aaa", "stats.count")
.flatMap(new FlatMapFunction<Data, Data>() {

代码示例来源:origin: apache/flink

.sum("sum")
.keyBy("aaa", "abc", "wxyz")
.flatMap(new FlatMapFunction<Data, Data>() {

代码示例来源:origin: com.alibaba.blink/flink-examples-streaming

out.collect(new Tuple2<>(value, 1));
}).keyBy(0).sum(1).print();

代码示例来源:origin: amidst/toolbox

public static void main(String[] args) throws Exception {
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  DataStream<Tuple2<String, Integer>> dataStream = env
      .fromElements("Who's there?",
          "I think I hear them. Stand, ho! Who's there?")
      //.socketTextStream("localhost", 9999)
      .flatMap(new Splitter())
      .keyBy(0)
      .sum(1);
  dataStream.print();
  env.execute();
  //env.execute("Socket Stream WordCount");
}

代码示例来源:origin: com.alibaba.blink/flink-examples-streaming

.flatMap(new Tokenizer()).setParallelism(parallelism).setResources(resourceSpec)
    .keyBy(0).sum(1).setParallelism(parallelism).setResources(resourceSpec);
} else {
  counts = text.flatMap(new Tokenizer()).setParallelism(parallelism)
    .keyBy(0).sum(1).setParallelism(parallelism);

代码示例来源:origin: com.alibaba.blink/flink-examples-streaming

.flatMap(new Tokenizer()).setParallelism(parallelism).setResources(resourceSpec)
    .keyBy(0).sum(1).setParallelism(parallelism).setResources(resourceSpec);
} else {
  counts = text.flatMap(new Tokenizer()).setParallelism(parallelism)
    .keyBy(0).sum(1).setParallelism(parallelism);

代码示例来源:origin: com.alibaba.blink/flink-examples-streaming

.keyBy(0).sum(1);

代码示例来源:origin: dataArtisans/flink-training-exercises

DataStream<Tuple2<Long, Long>> rideCounts = keyedByDriverId.sum(1);

代码示例来源:origin: com.alibaba.blink/flink-examples-streaming

build()).
keyBy(0, 2, 3).
sum(1).
setParallelism(groupbyParallelism).
setResources(

相关文章