Kafka制作人.send stops

l3zydbqr  于 2021-06-07  发布在  Kafka
关注(0)|答案(0)|浏览(259)

下面是mapr关于制作Kafka制作者/消费者的教程:
https://github.com/mapr-demos/kafka-sample-programs
我使用的是hortonworks沙盒虚拟机,版本2.4,Kafka0.9.2这是我的生产者代码:

package com.whatever.kafka.basic;

...

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.ProducerConfig;

public class BasicProducer {

    public static void main(String[] args) throws IOException {

        /* comment-out snippet from MapR example; hard-code properties instead
        KafkaProducer<String, String> producer;
        try (InputStream props = Resources.getResource("producer.props").openStream()) {
            Properties properties = new Properties();
            properties.load(props);
            producer = new KafkaProducer<String, String>(properties);
        }
        */

        Properties props = new Properties();
        props.put("bootstrap.servers", "sandbox.hortonworks.com:6667");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("auto.commit.interval.ms", 1000);
        props.put("linger.ms", 0);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("block.on.buffer.full", true);

        final KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);

        try {
            for (int i = 0; i < 1000000; i++) {

                System.out.println("Test 1"); // prints to console and stops on the next line

                producer.send(new ProducerRecord<String, String>(
                    "fast-messages", String.format("{\"type\":\"marker\", \"t\":%.3f, \"k\":%d}", System.nanoTime() * 1e-9, i)  
                ));

                System.out.println("Test 2");

                if (i % 250 == 0) {
                    producer.send(new ProducerRecord<String, String>(
                            "fast-messages", String.format("{\"type\":\"marker\", \"t\":%.3f, \"k\":%d}", System.nanoTime() * 1e-9, i)
                    ));
                    producer.send(new ProducerRecord<String, String>(
                            "summary-markers", String.format("{\"type\":\"other\", \"t\":%.3f, \"k\":%d}", System.nanoTime() * 1e-9, i)
                    )); 
                    producer.flush();
                    System.out.println("Sent msg number " + i);
                }
            }
            producer.close();
        } catch (Throwable throwable) {
            System.out.printf("%s", throwable.getStackTrace());
        } finally {
            producer.close();
        }
    } 
}

我已经开始学习zookeeper和kafka,并按照说明创建了主题:

$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic fast-messages
$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic summary-markers

但是当我运行编译后的应用程序

$ ./kafka-example producer

我得到这个输出:

log4j:WARN No appenders could be found for logger (org.apache.kafka.clients.producer.ProducerConfig)
log4j:WARN Please initialize the log4j system properly
Test 1

一开始就卡住了 producer.send 行,在 Test 1 系统输出日志。也不会抛出异常,只是在打印时停止 Test 1 线路。当我使用eclipse使用换行符来调试它时,它也会停在那一行上,显然什么都没有发生。
这个 fast-messages 以及 summary-markers Kafka日志中的目录会被创建,但不会写入任何内容。
你知道吗?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题