我正在使用flink并尝试将数据存储到hdfs。首先,数据来自Kafka。然后,flink接收来自kafka的数据,然后将数据写入hdfs。但是,我只能在终止flink应用程序时识别hdfs中的数据。我想做的是在不终止flink应用程序的情况下识别数据。
下面是我的Flink密码。这很简单。是否有任何配置,使我可以存储到hdfs的数据,基于时隙,而不终止flink应用程序?
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.fs.DateTimeBucketer;
import org.apache.flink.streaming.connectors.fs.RollingSink;
import org.apache.flink.streaming.connectors.fs.SequenceFileWriter;
import org.apache.flink.streaming.connectors.fs.StringWriter;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import java.util.Properties;
public class ReadFromKafka {
public static void main(String[] args) throws Exception {
// create execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "flink_consumer");
DataStream<String> stream = env
.addSource(new FlinkKafkaConsumer09<>("flink-demo", new SimpleStringSchema(), properties));
DataStream<String> output = stream.map(new MapFunction<String, String>() {
private static final long serialVersionUID = -6867736771747690202L;
@Override
public String map(String value) throws Exception {
return "Stream Value: " + value;
}
});
RollingSink sink = new RollingSink<String>("/user/sclee/flink/stream");
sink.setBucketer(new DateTimeBucketer("yyyy-MM-dd--HHmm"));
sink.setWriter(new StringWriter());
sink.setBatchSize(1024 * 1024 * 400); // this is 400 MB,
output.addSink(sink);
env.execute();
}
}
暂无答案!
目前还没有任何答案,快来回答吧!