Flink kafka的Source

x33g5p2x  于2021-03-14 发布在 Flink  
字(1.2k)|赞(0)|评价(0)|浏览(631)

加入kafka的依赖

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
</dependency>

直接运行程序

package com.gosuncn;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;

import java.util.Properties;

public class WordCountStreamingJob {

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers","192.168.8.101:9092");
        properties.setProperty("group.id","test");
        FlinkKafkaConsumer010<String> kafkaSource = new FlinkKafkaConsumer010<>("gosuncn", new SimpleStringSchema(), properties);
        DataStreamSource<String> dataStreamSource = env.addSource(kafkaSource);
        dataStreamSource.print();
        env.execute("WordCountStreamingJob");
    }
}

测试结果

6> Hello World
6> Hello Flink
6> Flink Spark

相关文章