spark kafka使用者仅在停止后接收消息

jmp7cifd  于 2021-05-19  发布在  Spark
关注(0)|答案(0)|浏览(226)

当我向Kafka发送消息时,它们会被发送,但只有在我停止消费之后,我的消费者才会阅读。然后所有事件都出现在日志中并传递给Kafka制作人。

public static void main(String[] args) throws InterruptedException {
        SparkConf sparkConf = new SparkConf();
        sparkConf.setAppName("kafakTest2");

        JavaStreamingContext streamingContext = new JavaStreamingContext(
                sparkConf, Durations.seconds(10));

        Map<String, Object> kafkaParams = new HashMap<String, Object>();
        kafkaParams.put("bootstrap.servers", "kafka.kafka:9092");
        kafkaParams.put("key.deserializer", StringDeserializer.class);
        kafkaParams.put("value.deserializer", StringDeserializer.class);
        kafkaParams.put("group.id", "spark_group1");
        kafkaParams.put("auto.offset.reset", "earliest");
        kafkaParams.put("enable.auto.commit", true);
        kafkaParams.put("partition.assignment.strategy", "range");

      Map<String, Integer> topics = new HashMap<String, Integer>();
            topics.put("dev_parcels_belt_scan", 1);

            Properties props = new Properties();
            props.put("bootstrap.servers", "kafka.kafka:9092");
            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("partition.assignment.strategy", "range");

            JavaPairReceiverInputDStream<String, String> lines = KafkaUtils.createStream(streamingContext, "kafka-zookeeper.kafka:2181", "group1", topics);

lines.foreachRDD(rdd -> {
            rdd.foreachPartition(iterator -> {
                KafkaProducer producer = new KafkaProducer<String,String>(props);
                        while (iterator.hasNext()) {
                            Tuple2<String, String> next = iterator.next();
                            System.out.println(next._1() + " --> " + next._2());
                            ProducerRecord record = new ProducerRecord<String,String>("spark","key",next._2());
                            producer.send(record);
                        }
                    }

            );
            });

streamingContext.start()            
streamingContext.awaitTermination() 

 }
}

在我向Kafka主题日志appers发送新事件之后:

Block input-0-1603026874600 stored as values in memory (estimated size 335.0 B, free 413.8 MB)
20/10/18 13:14:34 INFO BlockManagerInfo: Added input-0-1603026874600 in memory on spark-master-0.spark-master-headless.spark-dev.svc.cluster.local:35493 (size: 335.0 B, free: 413.9 MB)
20/10/18 13:14:34 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
20/10/18 13:14:34 WARN BlockManager: Block input-0-1603026874600 replicated to only 0 peer(s) instead of 1 peers
20/10/18 13:14:34 INFO BlockGenerator: Pushed block input-0-1603026874600
20/10/18 13:14:40 INFO JobScheduler: Added jobs for time 1603026880000 ms

只有在我停止spark作业之后,我才能看到事件日志,并且事件被发送到其他主题

test --> {"Id":1,"StateId":1635749,"barcode":"10099817543400111299","trackingNumber":null,"timeStamp":"2020-10-09T13:29:05Z"}

聚甲醛:

<dependencies>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.11.12</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${spark.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>${spark.version}</version>
            <scope>provided</scope>
        </dependency>
       <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
            <version>2.4.3</version>
      </dependency>
      <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>${spark.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>com.datastax.spark</groupId>
            <artifactId>spark-cassandra-connector_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
     </dependencies>

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题