本文整理了Java中org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator.keyBy()
方法的一些代码示例,展示了SingleOutputStreamOperator.keyBy()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。SingleOutputStreamOperator.keyBy()
方法的具体详情如下:
包路径:org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
类名称:SingleOutputStreamOperator
方法名:keyBy
暂无
代码示例来源:origin: apache/flink
private static KeyedStream<Event, Integer> applyTestStatefulOperator(
String name,
JoinFunction<Event, ComplexPayload, ComplexPayload> stateFunc,
KeyedStream<Event, Integer> source,
List<TypeSerializer<ComplexPayload>> stateSer,
List<Class<ComplexPayload>> stateClass) {
return source
.map(createArtificialKeyedStateMapper(e -> e, stateFunc, stateSer, stateClass))
.name(name)
.uid(name)
.returns(Event.class)
.keyBy(Event::getKey);
}
代码示例来源:origin: apache/flink
public static void main(String[] args) throws Exception {
final ParameterTool pt = ParameterTool.fromArgs(args);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
setupEnvironment(env, pt);
KeyedStream<Event, Integer> source = env.addSource(createEventSource(pt))
.name("EventSource")
.uid("EventSource")
.assignTimestampsAndWatermarks(createTimestampExtractor(pt))
.keyBy(Event::getKey);
List<TypeSerializer<ComplexPayload>> stateSer =
Collections.singletonList(new KryoSerializer<>(ComplexPayload.class, env.getConfig()));
KeyedStream<Event, Integer> afterStatefulOperations = isOriginalJobVariant(pt) ?
applyOriginalStatefulOperations(source, stateSer, Collections.emptyList()) :
applyUpgradedStatefulOperations(source, stateSer, Collections.emptyList());
afterStatefulOperations
.flatMap(createSemanticsCheckMapper(pt))
.name("SemanticsCheckMapper")
.addSink(new PrintSinkFunction<>());
env.execute("General purpose test job");
}
代码示例来源:origin: apache/flink
.keyBy(0).sum(1);
代码示例来源:origin: apache/flink
.keyBy(0)
.countWindow(windowSize, slideSize)
代码示例来源:origin: apache/flink
return element;
}).keyBy((KeySelector<Long, Long>) value -> value);
代码示例来源:origin: apache/flink
@Test
public void testUserProvidedHashing() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
List<String> userHashes = Arrays.asList("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb");
env.addSource(new NoOpSourceFunction(), "src").setUidHash(userHashes.get(0))
.map(new NoOpMapFunction())
.filter(new NoOpFilterFunction())
.keyBy(new NoOpKeySelector())
.reduce(new NoOpReduceFunction()).name("reduce").setUidHash(userHashes.get(1));
StreamGraph streamGraph = env.getStreamGraph();
int idx = 1;
for (JobVertex jobVertex : streamGraph.getJobGraph().getVertices()) {
List<JobVertexID> idAlternatives = jobVertex.getIdAlternatives();
Assert.assertEquals(idAlternatives.get(idAlternatives.size() - 1).toString(), userHashes.get(idx));
--idx;
}
}
代码示例来源:origin: apache/flink
out.collect("x " + value);
}).keyBy(String::length);
.keyBy(Long::intValue);
代码示例来源:origin: apache/flink
@Test
public void testProgram() throws Exception {
String resultPath = getTempDirPath("result");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> text = env.fromElements(WordCountData.TEXT);
DataStream<Tuple2<String, Integer>> counts = text
.flatMap(new Tokenizer())
.keyBy(0).sum(1);
counts.writeAsCsv(resultPath);
env.execute("WriteAsCsvTest");
//Strip the parentheses from the expected text like output
compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES
.replaceAll("[\\\\(\\\\)]", ""), resultPath);
}
代码示例来源:origin: apache/flink
.keyBy(new IdentityKeySelector<Integer>())
.map(new OnceFailingPartitionedSum(failurePos))
.keyBy(0)
.addSink(new CounterSink());
代码示例来源:origin: apache/flink
@Test
public void testProgram() throws Exception {
String resultPath = getTempDirPath("result");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> text = env.fromElements(WordCountData.TEXT);
DataStream<Tuple2<String, Integer>> counts = text
.flatMap(new Tokenizer())
.keyBy(0).sum(1);
counts.writeAsText(resultPath);
env.execute("WriteAsTextTest");
compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, resultPath);
}
代码示例来源:origin: apache/flink
public static void main(String[] args) throws Exception {
final ParameterTool pt = ParameterTool.fromArgs(args);
final String checkpointDir = pt.getRequired("checkpoint.dir");
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new FsStateBackend(checkpointDir));
env.setRestartStrategy(RestartStrategies.noRestart());
env.enableCheckpointing(1000L);
env.getConfig().disableGenericTypes();
env.addSource(new MySource()).uid("my-source")
.keyBy(anInt -> 0)
.map(new MyStatefulFunction()).uid("my-map")
.addSink(new DiscardingSink<>()).uid("my-sink");
env.execute();
}
}
代码示例来源:origin: apache/flink
public static void main(String[] args) throws Exception {
final ParameterTool pt = ParameterTool.fromArgs(args);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
setupEnvironment(env, pt);
final MonotonicTTLTimeProvider ttlTimeProvider = setBackendWithCustomTTLTimeProvider(env);
TtlTestConfig config = TtlTestConfig.fromArgs(pt);
StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(config.ttl)
.cleanupIncrementally(5, true)
.cleanupFullSnapshot()
.build();
env
.addSource(new TtlStateUpdateSource(config.keySpace, config.sleepAfterElements, config.sleepTime))
.name("TtlStateUpdateSource")
.keyBy(TtlStateUpdate::getKey)
.flatMap(new TtlVerifyUpdateFunction(ttlConfig, ttlTimeProvider, config.reportStatAfterUpdatesNum))
.name("TtlVerifyUpdateFunction")
.addSink(new PrintSinkFunction<>())
.name("PrintFailedVerifications");
env.execute("State TTL test job");
}
代码示例来源:origin: apache/flink
/**
* Runs the following program.
* <pre>
* [ (source)->(filter)] -> [ (map) -> (map) ] -> [ (groupBy/reduce)->(sink) ]
* </pre>
*/
@Override
public void testProgram(StreamExecutionEnvironment env) {
assertTrue("Broken test setup", NUM_STRINGS % 40 == 0);
final long failurePosMin = (long) (0.4 * NUM_STRINGS / PARALLELISM);
final long failurePosMax = (long) (0.7 * NUM_STRINGS / PARALLELISM);
final long failurePos = (new Random().nextLong() % (failurePosMax - failurePosMin)) + failurePosMin;
env.enableCheckpointing(200);
DataStream<String> stream = env.addSource(new StringGeneratingSourceFunction(NUM_STRINGS));
stream
// first vertex, chained to the source
// this filter throttles the flow until at least one checkpoint
// is complete, to make sure this program does not run without
.filter(new StringRichFilterFunction())
// -------------- seconds vertex - one-to-one connected ----------------
.map(new StringPrefixCountRichMapFunction())
.startNewChain()
.map(new StatefulCounterFunction())
// -------------- third vertex - reducer and the sink ----------------
.keyBy("prefix")
.flatMap(new OnceFailingAggregator(failurePos))
.addSink(new ValidatingSink());
}
代码示例来源:origin: apache/flink
private static void runPartitioningProgram(int parallelism) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(parallelism);
env.getConfig().enableObjectReuse();
env.setBufferTimeout(5L);
env.enableCheckpointing(1000, CheckpointingMode.AT_LEAST_ONCE);
env
.addSource(new TimeStampingSource())
.map(new IdMapper<Tuple2<Long, Long>>())
.keyBy(0)
.addSink(new TimestampingSink());
env.execute("Partitioning Program");
}
代码示例来源:origin: apache/flink
@Test
public void testNestedPojoFieldAccessor() throws Exception {
StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
see.getConfig().disableObjectReuse();
see.setParallelism(4);
DataStream<Data> dataStream = see.fromCollection(elements);
DataStream<Data> summedStream = dataStream
.keyBy("aaa")
.sum("stats.count")
.keyBy("aaa")
.flatMap(new FlatMapFunction<Data, Data>() {
Data[] first = new Data[3];
@Override
public void flatMap(Data value, Collector<Data> out) throws Exception {
if (first[value.aaa] == null) {
first[value.aaa] = value;
if (value.stats.count != 123) {
throw new RuntimeException("Expected stats.count to be 123");
}
} else {
if (value.stats.count != 2 * 123) {
throw new RuntimeException("Expected stats.count to be 2 * 123");
}
}
}
});
summedStream.print();
see.execute();
}
代码示例来源:origin: apache/flink
@Test
public void testProcessdWindowFunctionSideOutput() throws Exception {
TestListResultSink<Integer> resultSink = new TestListResultSink<>();
TestListResultSink<String> sideOutputResultSink = new TestListResultSink<>();
StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
see.setParallelism(3);
see.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<Integer> dataStream = see.fromCollection(elements);
OutputTag<String> sideOutputTag = new OutputTag<String>("side"){};
SingleOutputStreamOperator<Integer> windowOperator = dataStream
.assignTimestampsAndWatermarks(new TestWatermarkAssigner())
.keyBy(new TestKeySelector())
.timeWindow(Time.milliseconds(1), Time.milliseconds(1))
.process(new ProcessWindowFunction<Integer, Integer, Integer, TimeWindow>() {
private static final long serialVersionUID = 1L;
@Override
public void process(Integer integer, Context context, Iterable<Integer> elements, Collector<Integer> out) throws Exception {
out.collect(integer);
context.output(sideOutputTag, "sideout-" + String.valueOf(integer));
}
});
windowOperator.getSideOutput(sideOutputTag).addSink(sideOutputResultSink);
windowOperator.addSink(resultSink);
see.execute();
assertEquals(Arrays.asList("sideout-1", "sideout-2", "sideout-5"), sideOutputResultSink.getSortedResult());
assertEquals(Arrays.asList(1, 2, 5), resultSink.getSortedResult());
}
代码示例来源:origin: apache/flink
return element;
}).keyBy((KeySelector<Long, Long>) value -> value);
代码示例来源:origin: apache/flink
@Test
public void testUserProvidedHashingOnChainSupported() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
env.addSource(new NoOpSourceFunction(), "src").setUidHash("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa")
.map(new NoOpMapFunction()).setUidHash("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb")
.filter(new NoOpFilterFunction()).setUidHash("cccccccccccccccccccccccccccccccc")
.keyBy(new NoOpKeySelector())
.reduce(new NoOpReduceFunction()).name("reduce").setUidHash("dddddddddddddddddddddddddddddddd");
env.getStreamGraph().getJobGraph();
}
代码示例来源:origin: apache/flink
/**
* Runs the following program.
* <pre>
* [ (source)->(filter) ]-s->[ (map) ] -> [ (map) ] -> [ (groupBy/count)->(sink) ]
* </pre>
*/
@Override
public void testProgram(StreamExecutionEnvironment env) {
DataStream<String> stream = env.addSource(new StringGeneratingSourceFunction(NUM_STRINGS));
stream
// -------------- first vertex, chained to the source ----------------
.filter(new StringRichFilterFunction())
.shuffle()
// -------------- seconds vertex - the stateful one that also fails ----------------
.map(new StringPrefixCountRichMapFunction())
.startNewChain()
.map(new StatefulCounterFunction())
// -------------- third vertex - counter and the sink ----------------
.keyBy("prefix")
.map(new OnceFailingPrefixCounter(NUM_STRINGS))
.addSink(new SinkFunction<PrefixCount>() {
@Override
public void invoke(PrefixCount value) throws Exception {
// Do nothing here
}
});
}
代码示例来源:origin: apache/flink
.keyBy(new TestKeySelector())
.timeWindow(Time.milliseconds(1), Time.milliseconds(1))
.allowedLateness(Time.milliseconds(2))
内容来源于网络,如有侵权,请联系作者删除!