本文整理了Java中org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator.setParallelism()
方法的一些代码示例,展示了SingleOutputStreamOperator.setParallelism()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。SingleOutputStreamOperator.setParallelism()
方法的具体详情如下:
包路径:org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
类名称:SingleOutputStreamOperator
方法名:setParallelism
[英]Sets the parallelism for this operator.
[中]设置此运算符的并行度。
代码示例来源:origin: apache/flink
@Override
public DataStreamSource<T> setParallelism(int parallelism) {
if (parallelism != 1 && !isParallel) {
throw new IllegalArgumentException("Source: " + transformation.getId() + " is not a parallel source");
} else {
super.setParallelism(parallelism);
return this;
}
}
}
代码示例来源:origin: apache/flink
public static SingleOutputStreamOperator<Integer> createStatelessMap(DataStream<Integer> input) {
return input.map(new NoOpMapFunction())
.setParallelism(4);
}
代码示例来源:origin: apache/flink
public static SingleOutputStreamOperator<Integer> createFirstStatefulMap(ExecutionMode mode, DataStream<Integer> input) {
SingleOutputStreamOperator<Integer> map = input
.map(new StatefulStringStoringMap(mode, "first"))
.setParallelism(4)
.uid("first");
return map;
}
代码示例来源:origin: apache/flink
public static SingleOutputStreamOperator<Integer> createFirstStatefulMap(ExecutionMode mode, DataStream<Integer> input) {
return input
.map(new StatefulStringStoringMap(mode, "first"))
.setParallelism(4)
.uid("first");
}
代码示例来源:origin: apache/flink
public static SingleOutputStreamOperator<Integer> createSecondStatefulMap(ExecutionMode mode, DataStream<Integer> input) {
return input
.map(new StatefulStringStoringMap(mode, "second"))
.setParallelism(4)
.uid("second");
}
代码示例来源:origin: apache/flink
public static SingleOutputStreamOperator<Integer> createSecondStatefulMap(ExecutionMode mode, DataStream<Integer> input) {
SingleOutputStreamOperator<Integer> map = input
.map(new StatefulStringStoringMap(mode, "second"))
.setParallelism(4)
.uid("second");
return map;
}
代码示例来源:origin: apache/flink
public static SingleOutputStreamOperator<Integer> createThirdStatefulMap(ExecutionMode mode, DataStream<Integer> input) {
SingleOutputStreamOperator<Integer> map = input
.map(new StatefulStringStoringMap(mode, "third"))
.setParallelism(4)
.uid("third");
return map;
}
代码示例来源:origin: apache/flink
/**
* Extracts a timestamp from an element and assigns it as the internal timestamp of that element.
* The internal timestamps are, for example, used to to event-time window operations.
*
* <p>If you know that the timestamps are strictly increasing you can use an
* {@link AscendingTimestampExtractor}. Otherwise,
* you should provide a {@link TimestampExtractor} that also implements
* {@link TimestampExtractor#getCurrentWatermark()} to keep track of watermarks.
*
* @param extractor The TimestampExtractor that is called for each element of the DataStream.
*
* @deprecated Please use {@link #assignTimestampsAndWatermarks(AssignerWithPeriodicWatermarks)}
* of {@link #assignTimestampsAndWatermarks(AssignerWithPunctuatedWatermarks)}
* instead.
* @see #assignTimestampsAndWatermarks(AssignerWithPeriodicWatermarks)
* @see #assignTimestampsAndWatermarks(AssignerWithPunctuatedWatermarks)
*/
@Deprecated
public SingleOutputStreamOperator<T> assignTimestamps(TimestampExtractor<T> extractor) {
// match parallelism to input, otherwise dop=1 sources could lead to some strange
// behaviour: the watermark will creep along very slowly because the elements
// from the source go to each extraction operator round robin.
int inputParallelism = getTransformation().getParallelism();
ExtractTimestampsOperator<T> operator = new ExtractTimestampsOperator<>(clean(extractor));
return transform("ExtractTimestamps", getTransformation().getOutputType(), operator)
.setParallelism(inputParallelism);
}
代码示例来源:origin: apache/flink
public static SingleOutputStreamOperator<Integer> createWindowFunction(ExecutionMode mode, DataStream<Tuple2<Integer, Integer>> input) {
return input
.keyBy(0)
.countWindow(1)
.apply(new StatefulWindowFunction(mode))
.setParallelism(4)
.uid("window");
}
代码示例来源:origin: apache/flink
.setParallelism(inputParallelism);
代码示例来源:origin: apache/flink
.setParallelism(inputParallelism);
代码示例来源:origin: apache/flink
private static void addSmallBoundedJob(StreamExecutionEnvironment env, int parallelism) {
DataStream<Long> stream = env.generateSequence(1, 100).setParallelism(parallelism);
stream
.filter(ignored -> false).setParallelism(parallelism)
.startNewChain()
.print().setParallelism(parallelism);
}
}
代码示例来源:origin: apache/flink
@Test(expected = UnsupportedOperationException.class)
public void testDifferingParallelism() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// introduce dummy mapper to get to correct parallelism
DataStream<Integer> source = env.fromElements(1, 10)
.map(noOpIntMap);
IterativeStream<Integer> iter1 = source.iterate();
iter1.closeWith(iter1.map(noOpIntMap).setParallelism(parallelism / 2));
}
代码示例来源:origin: apache/flink
@Test(expected = UnsupportedOperationException.class)
public void testCoDifferingParallelism() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// introduce dummy mapper to get to correct parallelism
DataStream<Integer> source = env.fromElements(1, 10).map(noOpIntMap);
ConnectedIterativeStreams<Integer, Integer> coIter = source.iterate().withFeedbackType(
Integer.class);
coIter.closeWith(coIter.map(noOpIntCoMap).setParallelism(parallelism / 2));
}
代码示例来源:origin: apache/flink
.setParallelism(input1.getParallelism())
.returns(unionType);
DataStream<TaggedUnion<T1, T2>> taggedInput2 = input2
.map(new Input2Tagger<T1, T2>())
.setParallelism(input2.getParallelism())
.returns(unionType);
代码示例来源:origin: apache/flink
@Test(expected = UnsupportedOperationException.class)
public void testForwardFailsHightToLowParallelism() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// this does a rebalance that works
DataStream<Integer> src = env.fromElements(1, 2, 3).map(new NoOpIntMap());
// this doesn't work because it goes from 3 to 1
src.forward().map(new NoOpIntMap()).setParallelism(1);
env.execute();
}
代码示例来源:origin: apache/flink
DataStream<Integer> head1 = iter1.map(noOpIntMap).name("IterRebalanceMap").setParallelism(parallelism / 2);
DataStream<Integer> head2 = iter1.map(noOpIntMap).name("IterForwardMap");
DataStreamSink<Integer> head3 = iter1.map(noOpIntMap).setParallelism(parallelism / 2).addSink(new ReceiveCheckNoOpSink<Integer>());
DataStreamSink<Integer> head4 = iter1.map(noOpIntMap).addSink(new ReceiveCheckNoOpSink<Integer>());
代码示例来源:origin: apache/flink
/**
* Tests that the max parallelism is automatically set to the parallelism if it has not been
* specified.
*/
@Test
public void testAutoMaxParallelism() {
int globalParallelism = 42;
int mapParallelism = 17;
int maxParallelism = 21;
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(globalParallelism);
DataStream<Integer> source = env.fromElements(1, 2, 3);
DataStream<Integer> keyedResult1 = source.keyBy(value -> value).map(new NoOpIntMap());
DataStream<Integer> keyedResult2 = keyedResult1.keyBy(value -> value).map(new NoOpIntMap()).setParallelism(mapParallelism);
DataStream<Integer> keyedResult3 = keyedResult2.keyBy(value -> value).map(new NoOpIntMap()).setMaxParallelism(maxParallelism);
DataStream<Integer> keyedResult4 = keyedResult3.keyBy(value -> value).map(new NoOpIntMap()).setMaxParallelism(maxParallelism).setParallelism(mapParallelism);
keyedResult4.addSink(new DiscardingSink<>());
StreamGraph graph = env.getStreamGraph();
StreamNode keyedResult3Node = graph.getStreamNode(keyedResult3.getId());
StreamNode keyedResult4Node = graph.getStreamNode(keyedResult4.getId());
assertEquals(maxParallelism, keyedResult3Node.getMaxParallelism());
assertEquals(maxParallelism, keyedResult4Node.getMaxParallelism());
}
代码示例来源:origin: apache/flink
.setParallelism(parallelism / 2)
.name("shuffle").rebalance();
DataStreamSink<Integer> head3 = iter1.map(noOpIntMap).setParallelism(parallelism / 2)
.addSink(new ReceiveCheckNoOpSink<Integer>());
DataStreamSink<Integer> head4 = iter1.map(noOpIntMap).addSink(new ReceiveCheckNoOpSink<Integer>());
代码示例来源:origin: apache/flink
assertEquals(3, env.getStreamGraph().getStreamNode(parallelSource.getId()).getParallelism());
map.setParallelism(2);
assertEquals(2, env.getStreamGraph().getStreamNode(map.getId()).getParallelism());
内容来源于网络,如有侵权,请联系作者删除!