本文整理了Java中org.apache.flink.streaming.api.datastream.KeyedStream.sum()
方法的一些代码示例,展示了KeyedStream.sum()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。KeyedStream.sum()
方法的具体详情如下:
包路径:org.apache.flink.streaming.api.datastream.KeyedStream
类名称:KeyedStream
方法名:sum
[英]Applies an aggregation that gives a rolling sum of the data stream at the given position grouped by the given key. An independent aggregate is kept per key.
[中]应用一个聚合,该聚合在给定位置提供由给定键分组的数据流的滚动和。每个键保留一个独立的聚合。
代码示例来源:origin: apache/flink
out.collect(new Tuple2<>(value, 1));
}).keyBy(0).sum(1).print();
代码示例来源:origin: apache/flink
@Test(expected = CompositeType.InvalidFieldReferenceException.class)
public void testFailOnNestedPojoFieldAccessor() throws Exception {
StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Data> dataStream = see.fromCollection(elements);
dataStream.keyBy("aaa", "stats.count").sum("stats.nonExistingField");
}
代码示例来源:origin: apache/flink
.keyBy(0).sum(1);
代码示例来源:origin: apache/flink
.keyBy(0).sum(1);
代码示例来源:origin: apache/flink
@Test
public void testProgram() throws Exception {
String resultPath = getTempDirPath("result");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> text = env.fromElements(WordCountData.TEXT);
DataStream<Tuple2<String, Integer>> counts = text
.flatMap(new Tokenizer())
.keyBy(0).sum(1);
counts.writeAsCsv(resultPath);
env.execute("WriteAsCsvTest");
//Strip the parentheses from the expected text like output
compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES
.replaceAll("[\\\\(\\\\)]", ""), resultPath);
}
代码示例来源:origin: apache/flink
@Test
public void testProgram() throws Exception {
String resultPath = getTempDirPath("result");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> text = env.fromElements(WordCountData.TEXT);
DataStream<Tuple2<String, Integer>> counts = text
.flatMap(new Tokenizer())
.keyBy(0).sum(1);
counts.writeAsText(resultPath);
env.execute("WriteAsTextTest");
compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, resultPath);
}
代码示例来源:origin: apache/flink
@Test
public void testNettyEpoll() throws Exception {
MiniClusterWithClientResource cluster = trySetUpCluster();
try {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(NUM_TASK_MANAGERS);
env.getConfig().disableSysoutLogging();
DataStream<Integer> input = env.fromElements(1, 2, 3, 4, 1, 2, 3, 42);
input.keyBy(new KeySelector<Integer, Integer>() {
@Override
public Integer getKey(Integer value) throws Exception {
return value;
}
})
.sum(0)
.print();
env.execute();
}
finally {
cluster.after();
}
}
代码示例来源:origin: apache/flink
@Test
public void testNestedPojoFieldAccessor() throws Exception {
StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
see.getConfig().disableObjectReuse();
see.setParallelism(4);
DataStream<Data> dataStream = see.fromCollection(elements);
DataStream<Data> summedStream = dataStream
.keyBy("aaa")
.sum("stats.count")
.keyBy("aaa")
.flatMap(new FlatMapFunction<Data, Data>() {
Data[] first = new Data[3];
@Override
public void flatMap(Data value, Collector<Data> out) throws Exception {
if (first[value.aaa] == null) {
first[value.aaa] = value;
if (value.stats.count != 123) {
throw new RuntimeException("Expected stats.count to be 123");
}
} else {
if (value.stats.count != 2 * 123) {
throw new RuntimeException("Expected stats.count to be 2 * 123");
}
}
}
});
summedStream.print();
see.execute();
}
代码示例来源:origin: apache/flink
.sum("sum")
.keyBy("aaa", "stats.count")
.flatMap(new FlatMapFunction<Data, Data>() {
代码示例来源:origin: apache/flink
.sum("sum")
.keyBy("aaa", "abc", "wxyz")
.flatMap(new FlatMapFunction<Data, Data>() {
代码示例来源:origin: com.alibaba.blink/flink-examples-streaming
out.collect(new Tuple2<>(value, 1));
}).keyBy(0).sum(1).print();
代码示例来源:origin: amidst/toolbox
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<String, Integer>> dataStream = env
.fromElements("Who's there?",
"I think I hear them. Stand, ho! Who's there?")
//.socketTextStream("localhost", 9999)
.flatMap(new Splitter())
.keyBy(0)
.sum(1);
dataStream.print();
env.execute();
//env.execute("Socket Stream WordCount");
}
代码示例来源:origin: com.alibaba.blink/flink-examples-streaming
.flatMap(new Tokenizer()).setParallelism(parallelism).setResources(resourceSpec)
.keyBy(0).sum(1).setParallelism(parallelism).setResources(resourceSpec);
} else {
counts = text.flatMap(new Tokenizer()).setParallelism(parallelism)
.keyBy(0).sum(1).setParallelism(parallelism);
代码示例来源:origin: com.alibaba.blink/flink-examples-streaming
.flatMap(new Tokenizer()).setParallelism(parallelism).setResources(resourceSpec)
.keyBy(0).sum(1).setParallelism(parallelism).setResources(resourceSpec);
} else {
counts = text.flatMap(new Tokenizer()).setParallelism(parallelism)
.keyBy(0).sum(1).setParallelism(parallelism);
代码示例来源:origin: com.alibaba.blink/flink-examples-streaming
.keyBy(0).sum(1);
代码示例来源:origin: dataArtisans/flink-training-exercises
DataStream<Tuple2<Long, Long>> rideCounts = keyedByDriverId.sum(1);
代码示例来源:origin: com.alibaba.blink/flink-examples-streaming
build()).
keyBy(0, 2, 3).
sum(1).
setParallelism(groupbyParallelism).
setResources(
内容来源于网络,如有侵权,请联系作者删除!