org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator.writeAsText()方法的使用及代码示例

x33g5p2x  于2022-01-30 转载在 其他  
字(3.0k)|赞(0)|评价(0)|浏览(102)

本文整理了Java中org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator.writeAsText()方法的一些代码示例,展示了SingleOutputStreamOperator.writeAsText()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。SingleOutputStreamOperator.writeAsText()方法的具体详情如下:
包路径:org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
类名称:SingleOutputStreamOperator
方法名:writeAsText

SingleOutputStreamOperator.writeAsText介绍

暂无

代码示例

代码示例来源:origin: apache/flink

return message + ":" + messageFromPropsFile + ":" + orderedProperties;
})
.writeAsText(params.getRequired("output"), FileSystem.WriteMode.OVERWRITE);

代码示例来源:origin: apache/flink

public static void main(String[] args) throws Exception {
  final ParameterTool params = ParameterTool.fromArgs(args);
  final Path inputFile = Paths.get(params.getRequired("inputFile"));
  final Path inputDir = Paths.get(params.getRequired("inputDir"));
  final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  env.setParallelism(1);
  env.registerCachedFile(inputFile.toString(), "test_data", false);
  env.registerCachedFile(inputDir.toString(), "test_dir", false);
  final Path containedFile;
  try (Stream<Path> files = Files.list(inputDir)) {
    containedFile = files.findAny().orElseThrow(() -> new RuntimeException("Input directory must not be empty."));
  }
  env.fromElements(1)
    .map(new TestMapFunction(
      inputFile.toAbsolutePath().toString(),
      Files.size(inputFile),
      inputDir.toAbsolutePath().toString(),
      containedFile.getFileName().toString()))
    .writeAsText(params.getRequired("output"), FileSystem.WriteMode.OVERWRITE);
  env.execute("Distributed Cache Via Blob Test Program");
}

代码示例来源:origin: vasia/gelly-streaming

public static void main(String[] args) throws Exception {
  if (!parseParameters(args)) {
    return;
  }
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  DataStream<Tuple3<Integer, Integer, EventType>> edges = getGraphStream(env);
  // 1. emit (vertexID, 1) or (vertexID, -1) for addition or deletion
  edges.flatMap(new EmitVerticesWithChange())
      // group by vertex ID and maintain degree per vertex
      .keyBy(0).flatMap(new VertexDegreeCounts())
      // group by degree and emit current count
      .keyBy(0).map(new DegreeDistributionMap())
      .writeAsText(resultPath);
  env.execute("Streaming Degree Distribution");
}

代码示例来源:origin: apache/bahir-flink

@Test
public void testSimpleWriteAndRead() throws Exception {
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  DataStream<Event> input = env.fromElements(
    Event.of(1, "start", 1.0),
    Event.of(2, "middle", 2.0),
    Event.of(3, "end", 3.0),
    Event.of(4, "start", 4.0),
    Event.of(5, "middle", 5.0),
    Event.of(6, "end", 6.0)
  );
  String path = tempFolder.newFile().toURI().toString();
  input.transform("transformer", TypeInformation.of(Event.class), new StreamMap<>(new MapFunction<Event, Event>() {
    @Override
    public Event map(Event event) throws Exception {
      return event;
    }
  })).writeAsText(path);
  env.execute();
  Assert.assertEquals(6, getLineCount(path));
}

相关文章

微信公众号

最新文章

更多