kafka 2.3.0生产者和消费者

efzxgjgh  于 2021-06-04  发布在  Kafka
关注(0)|答案(2)|浏览(579)

我是Kafka的新手,想用Kafka2.3实现一个生产者/消费者应用程序。
我下载了Kafka2.3并安装在我的ubuntu服务器上。
我在网上找到了一些代码,并在idea的笔记本电脑上构建了它,但消费者无法获得任何信息。
我已经检查了我的服务器上有主题的主题信息。
我有用 kafka-console-consumer 若要检查此主题,已成功获取主题的值,但未与我的消费者一起获取。
我的消费者怎么了?

制作人

package com.phitrellis.tool

import java.util.Properties
import java.util.concurrent.{Future, TimeUnit}

import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.producer._

object MyKafkaProducer extends App {

  def createKafkaProducer(): Producer[String, String] = {
    val props = new Properties()
    props.put("bootstrap.servers", "*:9092")
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    props.put("producer.type", "async")
    props.put("acks", "all")

    new KafkaProducer[String, String](props)
  }

  def writeToKafka(topic: String): Unit = {
    val producer = createKafkaProducer()
    val record = new ProducerRecord[String, String](topic, "key", "value22222222222")
    println("start")
    producer.send(record)
    producer.close()
    println("end")
  }

  writeToKafka("phitrellis")

}

消费者

package com.phitrellis.tool

import java.util
import java.util.Properties
import java.time.Duration
import scala.collection.JavaConverters._
import org.apache.kafka.clients.consumer.KafkaConsumer

object MyKafkaConsumer extends App {

  def createKafkaConsumer(): KafkaConsumer[String, String] = {
    val props = new Properties()
    props.put("bootstrap.servers", "*:9092")
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    //    props.put("auto.offset.reset", "latest")
    props.put("enable.auto.commit", "true")
    props.put("auto.commit.interval.ms", "1000")
    props.put("group.id", "test")

    new KafkaConsumer[String, String](props)
  }

  def consumeFromKafka(topic: String) = {

    val consumer: KafkaConsumer[String, String] = createKafkaConsumer()
    consumer.subscribe(util.Arrays.asList(topic))
    while (true) {
      val records = consumer.poll(Duration.ofSeconds(2)).asScala.iterator
      println("true")
      for (record <- records){
        print(record.value())
      }
    }
  }

  consumeFromKafka("phitrellis")

}
i34xakig

i34xakig1#

这里有两部分。生产方和消费者。
你什么也没说制作人,所以我们假设它确实有用。但是,你检查过服务器了吗?您可以检查kafka日志文件,看看是否有关于这些特定主题/分区的数据。
在使用者方面,为了验证,您应该尝试使用来自同一主题的命令行,以确保数据在其中。在下面的链接中查找“kafka consumer console”,并按照这些步骤进行操作。
http://cloudurable.com/blog/kafka-tutorial-kafka-from-command-line/index.html
如果主题中有数据,那么运行该命令应该会得到数据。如果不是,那么它将只是“挂起”,因为它正在等待数据写入主题。
此外,您可以尝试使用这些命令行工具生成同一主题,以确保集群配置正确、具有正确的地址和端口、端口未被阻止等。

nwlls2ji

nwlls2ji2#

消费者代码中的两行代码至关重要:

props.put("auto.offset.reset", "latest")
props.put("group.id", "test")

要从题目的开头开始读,你必须设定 auto.offset.resetearliest ( latest 因为您跳过了消费者启动前生成的消息)。 group.id 负责集团管理。如果你开始用一些 group.id 然后重新启动应用程序或使用相同的 group.id 只会读取新邮件。
对于你的测试,我建议加上 auto.offset.reset -> earliest 和改变 group.id ```
props.put("auto.offset.reset", "earliest")
props.put("group.id", "test123")

另外:你必须记住 `KafkaProducer::send` 退货 `Future<RecordMetadata>` 消息是异步发送的,如果您在 `Future` 将完成的消息可能无法发送。

相关问题