ApacheFlink将数据流(源代码)转换为列表?

yrefmtwq  于 2021-06-24  发布在  Flink
关注(0)|答案(1)|浏览(296)

我的问题是如何从 DataStreamList 例如,为了能够遍历它。
代码如下所示:

package flinkoracle;

//imports
//....

public class FlinkOracle {

    final static Logger LOG = LoggerFactory.getLogger(FlinkOracle.class);

    public static void main(String[] args) {
        LOG.info("Starting...");
        // get the execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        TypeInformation[] fieldTypes = new TypeInformation[]{BasicTypeInfo.STRING_TYPE_INFO,
            BasicTypeInfo.STRING_TYPE_INFO,
            BasicTypeInfo.STRING_TYPE_INFO,
            BasicTypeInfo.STRING_TYPE_INFO};

        RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes);
        //get the source from Oracle DB
        DataStream<?> source = env
                .createInput(JDBCInputFormat.buildJDBCInputFormat()
                        .setDrivername("oracle.jdbc.driver.OracleDriver")
                        .setDBUrl("jdbc:oracle:thin:@localhost:1521")
                        .setUsername("user")
                        .setPassword("password")
                        .setQuery("select * from  table1")
                        .setRowTypeInfo(rowTypeInfo)
                        .finish());

        source.print().setParallelism(1);

        try {
            LOG.info("----------BEGIN----------");
            env.execute();
            LOG.info("----------END----------");
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        LOG.info("End...");
    }

}

提前多谢了。br塔马斯

ie3xauqp

ie3xauqp1#

flink提供了一个迭代器接收器来收集数据流结果,以便进行测试和调试。可按如下方式使用:

import org.apache.flink.contrib.streaming.DataStreamUtils;

DataStream<Tuple2<String, Integer>> myResult = ...
Iterator<Tuple2<String, Integer>> myOutput = DataStreamUtils.collect(myResult)

您可以将迭代器复制到如下新列表:

while (iter.hasNext())
    list.add(iter.next());

flink还在datastream上提供了一系列简单的write*()方法,这些方法主要用于调试目的。目标系统的数据刷新取决于outputformat的实现。这意味着并非所有发送到outputformat的元素都会立即显示在目标系统中。注意:这些write*()方法不参与flink的检查点,在失败的情况下,这些记录可能会丢失。

writeAsText() / TextOutputFormat
writeAsCsv(...) / CsvOutputFormat
print() / printToErr()
writeUsingOutputFormat() / FileOutputFormat
writeToSocket

来源:链接。
要使用datastreamutils,可能需要添加以下依赖项:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-contrib</artifactId>
    <version>0.10.2</version>
</dependency>

相关问题