本文整理了Java中org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator.print()
方法的一些代码示例,展示了SingleOutputStreamOperator.print()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。SingleOutputStreamOperator.print()
方法的具体详情如下:
包路径:org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
类名称:SingleOutputStreamOperator
方法名:print
暂无
代码示例来源:origin: apache/flink
out.collect(new Tuple2<>(value, 1));
}).keyBy(0).sum(1).print();
代码示例来源:origin: apache/flink
private static void addSmallBoundedJob(StreamExecutionEnvironment env, int parallelism) {
DataStream<Long> stream = env.generateSequence(1, 100).setParallelism(parallelism);
stream
.filter(ignored -> false).setParallelism(parallelism)
.startNewChain()
.print().setParallelism(parallelism);
}
}
代码示例来源:origin: apache/flink
@Test(expected = IllegalStateException.class)
public void testExecutionWithEmptyIteration() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Integer> source = env.fromElements(1, 10).map(noOpIntMap);
IterativeStream<Integer> iter1 = source.iterate();
iter1.map(noOpIntMap).print();
env.execute();
}
代码示例来源:origin: apache/flink
.filter(dummyFilter).slotSharingGroup("group 1")
.filter(dummyFilter).startNewChain()
.print().disableChaining();
.filter(dummyFilter).slotSharingGroup("group 2")
.filter(dummyFilter).startNewChain()
.print().disableChaining();
代码示例来源:origin: apache/flink
.print();
JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
代码示例来源:origin: apache/flink
@Test
public void testNettyEpoll() throws Exception {
MiniClusterWithClientResource cluster = trySetUpCluster();
try {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(NUM_TASK_MANAGERS);
env.getConfig().disableSysoutLogging();
DataStream<Integer> input = env.fromElements(1, 2, 3, 4, 1, 2, 3, 42);
input.keyBy(new KeySelector<Integer, Integer>() {
@Override
public Integer getKey(Integer value) throws Exception {
return value;
}
})
.sum(0)
.print();
env.execute();
}
finally {
cluster.after();
}
}
代码示例来源:origin: apache/flink
.print();
代码示例来源:origin: apache/flink
.print();
代码示例来源:origin: apache/flink
private void runTest(
SourceFunction<SessionEvent<Integer, TestEventPayload>> dataSource,
WindowFunction<SessionEvent<Integer, TestEventPayload>,
String, Tuple, TimeWindow> windowFunction) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
WindowedStream<SessionEvent<Integer, TestEventPayload>, Tuple, TimeWindow> windowedStream =
env.addSource(dataSource).keyBy("sessionKey")
.window(EventTimeSessionWindows.withGap(Time.milliseconds(MAX_SESSION_EVENT_GAP_MS)));
if (ALLOWED_LATENESS_MS != Long.MAX_VALUE) {
windowedStream = windowedStream.allowedLateness(Time.milliseconds(ALLOWED_LATENESS_MS));
}
if (PURGE_WINDOW_ON_FIRE) {
windowedStream = windowedStream.trigger(PurgingTrigger.of(EventTimeTrigger.create()));
}
windowedStream.apply(windowFunction).print();
JobExecutionResult result = env.execute();
// check that overall event counts match with our expectations. remember that late events within lateness will
// each trigger a window!
Assert.assertEquals(
(LATE_EVENTS_PER_SESSION + 1) * NUMBER_OF_SESSIONS * EVENTS_PER_SESSION,
(long) result.getAccumulatorResult(SESSION_COUNTER_ON_TIME_KEY));
Assert.assertEquals(
NUMBER_OF_SESSIONS * (LATE_EVENTS_PER_SESSION * (LATE_EVENTS_PER_SESSION + 1) / 2),
(long) result.getAccumulatorResult(SESSION_COUNTER_LATE_KEY));
}
代码示例来源:origin: apache/flink
source.map(new TestMap<Long, Long>()).print();
fail();
} catch (Exception ignored) {}
source.flatMap(new TestFlatMap<Long, Long>()).print();
fail();
} catch (Exception ignored) {}
source.connect(source).map(new TestCoMap<Long, Long, Integer>()).print();
fail();
} catch (Exception ignored) {}
source.connect(source).flatMap(new TestCoFlatMap<Long, Long, Integer>()).print();
fail();
} catch (Exception ignored) {}
.between(Time.milliseconds(10L), Time.milliseconds(10L))
.process(new TestProcessJoinFunction<>())
.print();
fail();
} catch (Exception ignored) {}
source.map(new TestMap<Long, Long>()).returns(Long.class).print();
source.flatMap(new TestFlatMap<Long, Long>()).returns(new TypeHint<Long>(){}).print();
source.connect(source).map(new TestCoMap<Long, Long, Integer>()).returns(BasicTypeInfo.INT_TYPE_INFO).print();
source.connect(source).flatMap(new TestCoFlatMap<Long, Long, Integer>())
.returns(BasicTypeInfo.INT_TYPE_INFO).print();
source.connect(source).keyBy(new TestKeySelector<>(), new TestKeySelector<>(), Types.STRING);
source.coGroup(source).where(new TestKeySelector<>(), Types.STRING).equalTo(new TestKeySelector<>(), Types.STRING);
代码示例来源:origin: apache/flink
@Test
public void testErrorOnEventTimeWithoutTimestamps() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
env.getConfig().disableSysoutLogging();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<Tuple2<String, Integer>> source1 =
env.fromElements(new Tuple2<>("a", 1), new Tuple2<>("b", 2));
source1
.keyBy(0)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.reduce(new ReduceFunction<Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) {
return value1;
}
})
.print();
try {
env.execute();
fail("this should fail with an exception");
} catch (Exception e) {
// expected
}
}
代码示例来源:origin: apache/flink
@Test
public void testErrorOnEventTimeOverProcessingTime() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
env.getConfig().disableSysoutLogging();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
DataStream<Tuple2<String, Integer>> source1 =
env.fromElements(new Tuple2<>("a", 1), new Tuple2<>("b", 2));
source1
.keyBy(0)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.reduce(new ReduceFunction<Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) {
return value1;
}
})
.print();
try {
env.execute();
fail("this should fail with an exception");
} catch (Exception e) {
// expected
}
}
代码示例来源:origin: streaming-olap/training
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment
.getExecutionEnvironment().setParallelism(1);
env.addSource(new LongSource())
.setParallelism(2)
.filter(new RichFilterFunction<Integer>() {
private Counter filterOutNumber;
@Override
public void open(Configuration parameters) throws Exception {
filterOutNumber = getRuntimeContext()
.getMetricGroup()
.addGroup("minwenjun")
.counter("filterOutNumber");
}
@Override
public boolean filter(Integer integer) throws Exception {
if (integer % 2 == 0) {
filterOutNumber.inc();
}
return !(integer % 2 == 0);
}
}).print();
env.execute();
}
}
代码示例来源:origin: vasia/gelly-streaming
public CentralizedWeightedMatching() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
// Source: http://grouplens.org/datasets/movielens/
@SuppressWarnings("serial")
DataStream<Edge<Long, Long>> edges = env
.readTextFile("movielens_10k_sorted.txt")
.map(new MapFunction<String, Edge<Long, Long>>() {
@Override
public Edge<Long, Long> map(String s) throws Exception {
String[] args = s.split("\t");
long src = Long.parseLong(args[0]);
long trg = Long.parseLong(args[1]) + 1000000;
long val = Long.parseLong(args[2]) * 10;
return new Edge<>(src, trg, val);
}
});
GraphStream<Long, NullValue, Long> graph = new SimpleEdgeStream<>(edges, env);
graph.getEdges()
.flatMap(new WeightedMatchingFlatMapper()).setParallelism(1)
.print().setParallelism(1);
JobExecutionResult res = env.execute("Distributed Merge Tree Sandbox");
long runtime = res.getNetRuntime();
System.out.println("Runtime: " + runtime);
}
代码示例来源:origin: dataArtisans/kafka-example
public static void main(String[] args) throws Exception {
// create execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// parse user parameters
ParameterTool parameterTool = ParameterTool.fromArgs(args);
DataStream<String> messageStream = env.addSource(new FlinkKafkaConsumer082<>(parameterTool.getRequired("topic"), new SimpleStringSchema(), parameterTool.getProperties()));
// print() will write the contents of the stream to the TaskManager's standard out stream
// the rebelance call is causing a repartitioning of the data so that all machines
// see the messages (for example in cases when "num kafka partitions" < "num flink operators"
messageStream.rebalance().map(new MapFunction<String, String>() {
private static final long serialVersionUID = -6867736771747690202L;
@Override
public String map(String value) throws Exception {
return "Kafka and Flink says: " + value;
}
}).print();
env.execute();
}
}
代码示例来源:origin: vasia/gelly-streaming
public static void main(String[] args) throws Exception {
if (!parseParameters(args)) {
return;
}
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
GraphStream<Long, NullValue, NullValue> edges = getGraphStream(env);
DataStream<DisjointSet<Long>> cc = edges.aggregate(new ConnectedComponents<Long, NullValue>(mergeWindowTime));
// flatten the elements of the disjoint set and print
// in windows of printWindowTime
cc.flatMap(new FlattenSet()).keyBy(0)
.timeWindow(Time.of(printWindowTime, TimeUnit.MILLISECONDS))
.fold(new Tuple2<Long, Long>(0l, 0l), new IdentityFold()).print();
env.execute("Streaming Connected Components");
}
代码示例来源:origin: vasia/gelly-streaming
public static void main(String[] args) throws Exception {
if (!parseParameters(args)) {
return;
}
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
GraphStream<Long, NullValue, NullValue> edges = getGraphStream(env);
DataStream<AdjacencyListGraph<Long>> spanner = edges.aggregate(new Spanner<Long, NullValue>(mergeWindowTime, k));
// flatten the elements of the spanner and
// in windows of printWindowTime
spanner.flatMap(new FlattenSet())
.keyBy(0).timeWindow(Time.of(printWindowTime, TimeUnit.MILLISECONDS))
.fold(new Tuple2<>(0l, 0l), new IdentityFold()).print();
env.execute("Streaming Spanner");
}
代码示例来源:origin: dataArtisans/flink-training-exercises
public static void main(String[] args) throws Exception {
// read parameters
ParameterTool params = ParameterTool.fromArgs(args);
String input = params.getRequired("input");
// set up streaming execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
// connect to the data file
DataStream<String> carData = env.readTextFile(input);
// map to events
DataStream<ConnectedCarEvent> events = carData
.map((String line) -> ConnectedCarEvent.fromString(line))
.assignTimestampsAndWatermarks(new ConnectedCarAssigner());
// sort events
events.keyBy((ConnectedCarEvent event) -> event.carId)
.process(new SortFunction())
.print();
env.execute("Sort Connected Car Events");
}
代码示例来源:origin: dataArtisans/flink-training-exercises
public static void main(String[] args) throws Exception {
// read parameters
ParameterTool params = ParameterTool.fromArgs(args);
String input = params.getRequired("input");
// set up streaming execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// connect to the data file
DataStream<String> carData = env.readTextFile(input);
// find segments
DataStream<ConnectedCarEvent> events = carData
.map((String line) -> ConnectedCarEvent.fromString(line))
.assignTimestampsAndWatermarks(new ConnectedCarAssigner());
events.keyBy("carId")
.window(EventTimeSessionWindows.withGap(Time.seconds(15)))
.apply(new CreateGapSegment())
.print();
env.execute("Driving Sessions");
}
代码示例来源:origin: dataArtisans/flink-training-exercises
public static void main(String[] args) throws Exception {
// read parameters
ParameterTool params = ParameterTool.fromArgs(args);
String input = params.getRequired("input");
// set up streaming execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// connect to the data file
DataStream<String> carData = env.readTextFile(input);
// map to events
DataStream<ConnectedCarEvent> events = carData
.map((String line) -> ConnectedCarEvent.fromString(line))
.assignTimestampsAndWatermarks(new ConnectedCarAssigner());
// find segments
events.keyBy("carId")
.window(GlobalWindows.create())
.trigger(new SegmentingOutOfOrderTrigger())
.evictor(new SegmentingEvictor())
.apply(new CreateStoppedSegment())
.print();
env.execute("Driving Segments");
}
内容来源于网络,如有侵权,请联系作者删除!