kafka 0.9无法使生产者和消费者api正常工作

3okqufwl  于 2021-06-07  发布在  Kafka
关注(0)|答案(0)|浏览(243)

正在尝试新的0.9使用者api和生产者api。但似乎无法让它工作。我有一个生产者,生产100条消息,每个分区的主题与两个分区。我的代码读起来像

Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("acks", "1");
    props.put("retries", 0);
    props.put("batch.size", 2);
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

    Producer<String, String> producer = new KafkaProducer<>(props);
    System.out.println("Created producer");
    for (int i = 0; i < 100; i++) {
      for (int j = 0; j < 2; j++) {
        producer.send(new ProducerRecord<>("burrow_test_2", j, "M_"+ i + "_" + j + "_msg",
            "M_" + i + "_" + j + "_msg"));
        System.out.println("Sending msg " + i + " into partition " + j);
      }
      System.out.println("Sent 200 msgs");
    }

    System.out.println("Closing producer");
    producer.close();
    System.out.println("Closed producer");

现在producer.close需要很长时间才能关闭,我假设所有缓冲区都会被刷新,消息会被写入日志的尾部。
现在我的消费者,我想在退出前阅读指定数量的消息。我选择在阅读时手动提交偏移量。代码读起来像

int noOfMessagesToRead = Integer.parseInt(args[0]);
    String groupName = args[1];

    System.out.println("Reading " + noOfMessagesToRead + " for group " + groupName);
    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("group.id", groupName);
    //props.put("enable.auto.commit", "true");
    //props.put("auto.commit.interval.ms", "1000");
    props.put("session.timeout.ms", "30000");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    consumer.subscribe(Arrays.asList("burrow_test_2"));
    //consumer.seekToEnd(new TopicPartition("burrow_test_2", 0), new TopicPartition("burrow_test_2", 1));
    int noOfMessagesRead = 0;
    while (noOfMessagesRead != noOfMessagesToRead) {
      System.out.println("Polling....");
      ConsumerRecords<String, String> records = consumer.poll(0);
      for (ConsumerRecord<String, String> record : records) {
        System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(),
            record.value());
        noOfMessagesRead++;
      }
      consumer.commitSync();
    }
  }

但是我的消费者总是停留在轮询调用上(即使我将timeout指定为0,也从不返回)。
现在,为了确认,我尝试从kafka提供的命令行控制台consumer消费 bin/kafka-console-consumer.sh --from-beginning --new-consumer --topic burrow_test_2 --bootstrap-server localhost:9092 .
现在这似乎是只读的,只使用在我启动java producer之前生成的消息。
所以问题是
上面的java producer有什么问题
为什么消费者没有轮询任何东西(我有一些旧消息,控制台消费者可以很好地读取它们)。
更新:我的坏。我启用了从localhost到运行kafka,zk的远程机器的端口转发。在这种情况下,似乎有一些问题。在localhost上运行似乎产生了消息。
我唯一剩下的问题是,对于消费者api,我无法 seek 偏移量。我两个都试过了 seekToEnd 以及 seekToBeginning 方法都抛出了一个异常,表示未找到任何记录。那么,消费者寻求补偿的途径是什么呢。是 auto.offset.reset 消费者财产是唯一的选择?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题