当我向Kafka发送消息时,它们会被发送,但只有在我停止消费之后,我的消费者才会阅读。然后所有事件都出现在日志中并传递给Kafka制作人。
public static void main(String[] args) throws InterruptedException {
SparkConf sparkConf = new SparkConf();
sparkConf.setAppName("kafakTest2");
JavaStreamingContext streamingContext = new JavaStreamingContext(
sparkConf, Durations.seconds(10));
Map<String, Object> kafkaParams = new HashMap<String, Object>();
kafkaParams.put("bootstrap.servers", "kafka.kafka:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "spark_group1");
kafkaParams.put("auto.offset.reset", "earliest");
kafkaParams.put("enable.auto.commit", true);
kafkaParams.put("partition.assignment.strategy", "range");
Map<String, Integer> topics = new HashMap<String, Integer>();
topics.put("dev_parcels_belt_scan", 1);
Properties props = new Properties();
props.put("bootstrap.servers", "kafka.kafka:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("partition.assignment.strategy", "range");
JavaPairReceiverInputDStream<String, String> lines = KafkaUtils.createStream(streamingContext, "kafka-zookeeper.kafka:2181", "group1", topics);
lines.foreachRDD(rdd -> {
rdd.foreachPartition(iterator -> {
KafkaProducer producer = new KafkaProducer<String,String>(props);
while (iterator.hasNext()) {
Tuple2<String, String> next = iterator.next();
System.out.println(next._1() + " --> " + next._2());
ProducerRecord record = new ProducerRecord<String,String>("spark","key",next._2());
producer.send(record);
}
}
);
});
streamingContext.start()
streamingContext.awaitTermination()
}
}
在我向Kafka主题日志appers发送新事件之后:
Block input-0-1603026874600 stored as values in memory (estimated size 335.0 B, free 413.8 MB)
20/10/18 13:14:34 INFO BlockManagerInfo: Added input-0-1603026874600 in memory on spark-master-0.spark-master-headless.spark-dev.svc.cluster.local:35493 (size: 335.0 B, free: 413.9 MB)
20/10/18 13:14:34 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
20/10/18 13:14:34 WARN BlockManager: Block input-0-1603026874600 replicated to only 0 peer(s) instead of 1 peers
20/10/18 13:14:34 INFO BlockGenerator: Pushed block input-0-1603026874600
20/10/18 13:14:40 INFO JobScheduler: Added jobs for time 1603026880000 ms
只有在我停止spark作业之后,我才能看到事件日志,并且事件被发送到其他主题
test --> {"Id":1,"StateId":1635749,"barcode":"10099817543400111299","trackingNumber":null,"timeStamp":"2020-10-09T13:29:05Z"}
聚甲醛:
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.12</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
<version>2.4.3</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.datastax.spark</groupId>
<artifactId>spark-cassandra-connector_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
</dependencies>
暂无答案!
目前还没有任何答案,快来回答吧!