org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator.print()方法的使用及代码示例

x33g5p2x  于2022-01-30 转载在 其他  
字(12.9k)|赞(0)|评价(0)|浏览(195)

本文整理了Java中org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator.print()方法的一些代码示例,展示了SingleOutputStreamOperator.print()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。SingleOutputStreamOperator.print()方法的具体详情如下:
包路径:org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
类名称:SingleOutputStreamOperator
方法名:print

SingleOutputStreamOperator.print介绍

暂无

代码示例

代码示例来源:origin: apache/flink

out.collect(new Tuple2<>(value, 1));
}).keyBy(0).sum(1).print();

代码示例来源:origin: apache/flink

private static void addSmallBoundedJob(StreamExecutionEnvironment env, int parallelism) {
    DataStream<Long> stream = env.generateSequence(1, 100).setParallelism(parallelism);

    stream
        .filter(ignored -> false).setParallelism(parallelism)
          .startNewChain()
          .print().setParallelism(parallelism);
  }
}

代码示例来源:origin: apache/flink

@Test(expected = IllegalStateException.class)
public void testExecutionWithEmptyIteration() throws Exception {
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  DataStream<Integer> source = env.fromElements(1, 10).map(noOpIntMap);
  IterativeStream<Integer> iter1 = source.iterate();
  iter1.map(noOpIntMap).print();
  env.execute();
}

代码示例来源:origin: apache/flink

.filter(dummyFilter).slotSharingGroup("group 1")
.filter(dummyFilter).startNewChain()
.print().disableChaining();
  .filter(dummyFilter).slotSharingGroup("group 2")
  .filter(dummyFilter).startNewChain()
  .print().disableChaining();

代码示例来源:origin: apache/flink

.print();
JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());

代码示例来源:origin: apache/flink

@Test
public void testNettyEpoll() throws Exception {
  MiniClusterWithClientResource cluster = trySetUpCluster();
  try {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(NUM_TASK_MANAGERS);
    env.getConfig().disableSysoutLogging();
    DataStream<Integer> input = env.fromElements(1, 2, 3, 4, 1, 2, 3, 42);
    input.keyBy(new KeySelector<Integer, Integer>() {
        @Override
        public Integer getKey(Integer value) throws Exception {
          return value;
        }
      })
      .sum(0)
      .print();
    env.execute();
  }
  finally {
    cluster.after();
  }
}

代码示例来源:origin: apache/flink

.print();

代码示例来源:origin: apache/flink

.print();

代码示例来源:origin: apache/flink

private void runTest(
    SourceFunction<SessionEvent<Integer, TestEventPayload>> dataSource,
    WindowFunction<SessionEvent<Integer, TestEventPayload>,
        String, Tuple, TimeWindow> windowFunction) throws Exception {
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
  WindowedStream<SessionEvent<Integer, TestEventPayload>, Tuple, TimeWindow> windowedStream =
      env.addSource(dataSource).keyBy("sessionKey")
      .window(EventTimeSessionWindows.withGap(Time.milliseconds(MAX_SESSION_EVENT_GAP_MS)));
  if (ALLOWED_LATENESS_MS != Long.MAX_VALUE) {
    windowedStream = windowedStream.allowedLateness(Time.milliseconds(ALLOWED_LATENESS_MS));
  }
  if (PURGE_WINDOW_ON_FIRE) {
    windowedStream = windowedStream.trigger(PurgingTrigger.of(EventTimeTrigger.create()));
  }
  windowedStream.apply(windowFunction).print();
  JobExecutionResult result = env.execute();
  // check that overall event counts match with our expectations. remember that late events within lateness will
  // each trigger a window!
  Assert.assertEquals(
    (LATE_EVENTS_PER_SESSION + 1) * NUMBER_OF_SESSIONS * EVENTS_PER_SESSION,
    (long) result.getAccumulatorResult(SESSION_COUNTER_ON_TIME_KEY));
  Assert.assertEquals(
    NUMBER_OF_SESSIONS * (LATE_EVENTS_PER_SESSION * (LATE_EVENTS_PER_SESSION + 1) / 2),
    (long) result.getAccumulatorResult(SESSION_COUNTER_LATE_KEY));
}

代码示例来源:origin: apache/flink

source.map(new TestMap<Long, Long>()).print();
  fail();
} catch (Exception ignored) {}
  source.flatMap(new TestFlatMap<Long, Long>()).print();
  fail();
} catch (Exception ignored) {}
  source.connect(source).map(new TestCoMap<Long, Long, Integer>()).print();
  fail();
} catch (Exception ignored) {}
  source.connect(source).flatMap(new TestCoFlatMap<Long, Long, Integer>()).print();
  fail();
} catch (Exception ignored) {}
    .between(Time.milliseconds(10L), Time.milliseconds(10L))
    .process(new TestProcessJoinFunction<>())
    .print();
  fail();
} catch (Exception ignored) {}
source.map(new TestMap<Long, Long>()).returns(Long.class).print();
source.flatMap(new TestFlatMap<Long, Long>()).returns(new TypeHint<Long>(){}).print();
source.connect(source).map(new TestCoMap<Long, Long, Integer>()).returns(BasicTypeInfo.INT_TYPE_INFO).print();
source.connect(source).flatMap(new TestCoFlatMap<Long, Long, Integer>())
    .returns(BasicTypeInfo.INT_TYPE_INFO).print();
source.connect(source).keyBy(new TestKeySelector<>(), new TestKeySelector<>(), Types.STRING);
source.coGroup(source).where(new TestKeySelector<>(), Types.STRING).equalTo(new TestKeySelector<>(), Types.STRING);

代码示例来源:origin: apache/flink

@Test
public void testErrorOnEventTimeWithoutTimestamps() {
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  env.setParallelism(2);
  env.getConfig().disableSysoutLogging();
  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
  DataStream<Tuple2<String, Integer>> source1 =
      env.fromElements(new Tuple2<>("a", 1), new Tuple2<>("b", 2));
  source1
      .keyBy(0)
      .window(TumblingEventTimeWindows.of(Time.seconds(5)))
      .reduce(new ReduceFunction<Tuple2<String, Integer>>() {
        @Override
        public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2)  {
          return value1;
        }
      })
      .print();
  try {
    env.execute();
    fail("this should fail with an exception");
  } catch (Exception e) {
    // expected
  }
}

代码示例来源:origin: apache/flink

@Test
public void testErrorOnEventTimeOverProcessingTime() {
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  env.setParallelism(2);
  env.getConfig().disableSysoutLogging();
  env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
  DataStream<Tuple2<String, Integer>> source1 =
      env.fromElements(new Tuple2<>("a", 1), new Tuple2<>("b", 2));
  source1
      .keyBy(0)
      .window(TumblingEventTimeWindows.of(Time.seconds(5)))
      .reduce(new ReduceFunction<Tuple2<String, Integer>>() {
        @Override
        public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2)  {
          return value1;
        }
      })
      .print();
  try {
    env.execute();
    fail("this should fail with an exception");
  } catch (Exception e) {
    // expected
  }
}

代码示例来源:origin: streaming-olap/training

public static void main(String[] args) throws Exception {
    final StreamExecutionEnvironment env = StreamExecutionEnvironment
        .getExecutionEnvironment().setParallelism(1);

    env.addSource(new LongSource())
    .setParallelism(2)
    .filter(new RichFilterFunction<Integer>() {
      private Counter filterOutNumber;
      @Override
      public void open(Configuration parameters) throws Exception {
         filterOutNumber = getRuntimeContext()
            .getMetricGroup()
            .addGroup("minwenjun")
            .counter("filterOutNumber");
      }

      @Override
      public boolean filter(Integer integer) throws Exception {
        if (integer % 2 == 0) {
          filterOutNumber.inc();
        }
        return !(integer % 2 == 0);
      }
    }).print();

    env.execute();
  }
}

代码示例来源:origin: vasia/gelly-streaming

public CentralizedWeightedMatching() throws Exception {
  StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
  // Source: http://grouplens.org/datasets/movielens/
  @SuppressWarnings("serial")
  DataStream<Edge<Long, Long>> edges = env
      .readTextFile("movielens_10k_sorted.txt")
      .map(new MapFunction<String, Edge<Long, Long>>() {
        @Override
        public Edge<Long, Long> map(String s) throws Exception {
          String[] args = s.split("\t");
          long src = Long.parseLong(args[0]);
          long trg = Long.parseLong(args[1]) + 1000000;
          long val = Long.parseLong(args[2]) * 10;
          return new Edge<>(src, trg, val);
        }
      });
  GraphStream<Long, NullValue, Long> graph = new SimpleEdgeStream<>(edges, env);
  graph.getEdges()
      .flatMap(new WeightedMatchingFlatMapper()).setParallelism(1)
      .print().setParallelism(1);
  JobExecutionResult res = env.execute("Distributed Merge Tree Sandbox");
  long runtime = res.getNetRuntime();
  System.out.println("Runtime: " + runtime);
}

代码示例来源:origin: dataArtisans/kafka-example

public static void main(String[] args) throws Exception {
    // create execution environment
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    // parse user parameters
    ParameterTool parameterTool = ParameterTool.fromArgs(args);

    DataStream<String> messageStream = env.addSource(new FlinkKafkaConsumer082<>(parameterTool.getRequired("topic"), new SimpleStringSchema(), parameterTool.getProperties()));

    // print() will write the contents of the stream to the TaskManager's standard out stream
    // the rebelance call is causing a repartitioning of the data so that all machines
    // see the messages (for example in cases when "num kafka partitions" < "num flink operators"
    messageStream.rebalance().map(new MapFunction<String, String>() {
      private static final long serialVersionUID = -6867736771747690202L;

      @Override
      public String map(String value) throws Exception {
        return "Kafka and Flink says: " + value;
      }
    }).print();

    env.execute();
  }
}

代码示例来源:origin: vasia/gelly-streaming

public static void main(String[] args) throws Exception {
  if (!parseParameters(args)) {
    return;
  }
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  GraphStream<Long, NullValue, NullValue> edges = getGraphStream(env);
  DataStream<DisjointSet<Long>> cc = edges.aggregate(new ConnectedComponents<Long, NullValue>(mergeWindowTime));
  // flatten the elements of the disjoint set and print
  // in windows of printWindowTime
  cc.flatMap(new FlattenSet()).keyBy(0)
      .timeWindow(Time.of(printWindowTime, TimeUnit.MILLISECONDS))
      .fold(new Tuple2<Long, Long>(0l, 0l), new IdentityFold()).print();
  env.execute("Streaming Connected Components");
}

代码示例来源:origin: vasia/gelly-streaming

public static void main(String[] args) throws Exception {
  if (!parseParameters(args)) {
    return;
  }
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  GraphStream<Long, NullValue, NullValue> edges = getGraphStream(env);
  DataStream<AdjacencyListGraph<Long>> spanner = edges.aggregate(new Spanner<Long, NullValue>(mergeWindowTime, k));
  // flatten the elements of the spanner and
  // in windows of printWindowTime
  spanner.flatMap(new FlattenSet())
      .keyBy(0).timeWindow(Time.of(printWindowTime, TimeUnit.MILLISECONDS))
      .fold(new Tuple2<>(0l, 0l), new IdentityFold()).print();
  env.execute("Streaming Spanner");
}

代码示例来源:origin: dataArtisans/flink-training-exercises

public static void main(String[] args) throws Exception {
  // read parameters
  ParameterTool params = ParameterTool.fromArgs(args);
  String input = params.getRequired("input");
  // set up streaming execution environment
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
  env.setParallelism(1);
  // connect to the data file
  DataStream<String> carData = env.readTextFile(input);
  // map to events
  DataStream<ConnectedCarEvent> events = carData
      .map((String line) -> ConnectedCarEvent.fromString(line))
      .assignTimestampsAndWatermarks(new ConnectedCarAssigner());
  // sort events
  events.keyBy((ConnectedCarEvent event) -> event.carId)
      .process(new SortFunction())
      .print();
  env.execute("Sort Connected Car Events");
}

代码示例来源:origin: dataArtisans/flink-training-exercises

public static void main(String[] args) throws Exception {
  // read parameters
  ParameterTool params = ParameterTool.fromArgs(args);
  String input = params.getRequired("input");
  // set up streaming execution environment
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
  // connect to the data file
  DataStream<String> carData = env.readTextFile(input);
  // find segments
  DataStream<ConnectedCarEvent> events = carData
      .map((String line) -> ConnectedCarEvent.fromString(line))
      .assignTimestampsAndWatermarks(new ConnectedCarAssigner());
  events.keyBy("carId")
      .window(EventTimeSessionWindows.withGap(Time.seconds(15)))
      .apply(new CreateGapSegment())
      .print();
  env.execute("Driving Sessions");
}

代码示例来源:origin: dataArtisans/flink-training-exercises

public static void main(String[] args) throws Exception {
  // read parameters
  ParameterTool params = ParameterTool.fromArgs(args);
  String input = params.getRequired("input");
  // set up streaming execution environment
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
  // connect to the data file
  DataStream<String> carData = env.readTextFile(input);
  // map to events
  DataStream<ConnectedCarEvent> events = carData
      .map((String line) -> ConnectedCarEvent.fromString(line))
      .assignTimestampsAndWatermarks(new ConnectedCarAssigner());
  // find segments
  events.keyBy("carId")
      .window(GlobalWindows.create())
      .trigger(new SegmentingOutOfOrderTrigger())
      .evictor(new SegmentingEvictor())
      .apply(new CreateStoppedSegment())
      .print();
  env.execute("Driving Segments");
}

相关文章

微信公众号

最新文章

更多