我的问题是如何从 DataStream
到 List
例如,为了能够遍历它。
代码如下所示:
package flinkoracle;
//imports
//....
public class FlinkOracle {
final static Logger LOG = LoggerFactory.getLogger(FlinkOracle.class);
public static void main(String[] args) {
LOG.info("Starting...");
// get the execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
TypeInformation[] fieldTypes = new TypeInformation[]{BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO};
RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes);
//get the source from Oracle DB
DataStream<?> source = env
.createInput(JDBCInputFormat.buildJDBCInputFormat()
.setDrivername("oracle.jdbc.driver.OracleDriver")
.setDBUrl("jdbc:oracle:thin:@localhost:1521")
.setUsername("user")
.setPassword("password")
.setQuery("select * from table1")
.setRowTypeInfo(rowTypeInfo)
.finish());
source.print().setParallelism(1);
try {
LOG.info("----------BEGIN----------");
env.execute();
LOG.info("----------END----------");
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
LOG.info("End...");
}
}
提前多谢了。br塔马斯
1条答案
按热度按时间ie3xauqp1#
flink提供了一个迭代器接收器来收集数据流结果,以便进行测试和调试。可按如下方式使用:
您可以将迭代器复制到如下新列表:
flink还在datastream上提供了一系列简单的write*()方法,这些方法主要用于调试目的。目标系统的数据刷新取决于outputformat的实现。这意味着并非所有发送到outputformat的元素都会立即显示在目标系统中。注意:这些write*()方法不参与flink的检查点,在失败的情况下,这些记录可能会丢失。
来源:链接。
要使用datastreamutils,可能需要添加以下依赖项: