connectexception

x9ybnkn6  于 2021-06-08  发布在  Kafka
关注(0)|答案(2)|浏览(324)

我有两个kafka代理:server1:9092和server2:9092我正在使用一个java客户机向这个集群发送消息,代码如下:

@Test
public void sendRecordToTopic() throws InterruptedException, ExecutionException {

    //See at http://kafka.apache.org/documentation.html#newproducerconfigs
    Properties props = new Properties();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
            "server1:9092,server2:9092");
    props.put(ProducerConfig.ACKS_CONFIG, "1");
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

    KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);

    ProducerRecord<String, String> myRecord =
            new ProducerRecord<String, String>("my-replicated-topic", "test", "someValue");

    boolean syncSend = true;

    if (syncSend) {
        //Synchronously send
        producer.send(myRecord).get();
    } else {
        //Asynchronously send
        producer.send(myRecord);
    }
    producer.close();
}

当某个代理关闭时,测试在某些情况下会引发此异常(在此异常示例中,“server1”已关闭):
2015-11-02 17:59:29,138 warn[org.apache.kafka.common.network.selector]server1/40.35.250.227 java.net.connectexception的i/o出错:连接被拒绝:sun.nio.ch.socketchannelimpl.checkconnect(本机方法)的sun.nio.ch.socketchannelimpl.finishconnect(socketchannelimpl)没有进一步的信息。java:717)在org.apache.kafka.common.network.selector.poll(选择器。java:238)在org.apache.kafka.clients.networkclient.poll(networkclient。java:192)在org.apache.kafka.clients.producer.internals.sender.run(sender。java:191)在org.apache.kafka.clients.producer.internals.sender.run(sender。java:122)在java.lang.thread.run(线程。java:745)

iyr7buue

iyr7buue1#

这就是我解决问题的方法:
至少需要3个zookeeper节点,我必须再配置一个。这是因为zk确定leader的方式需要50%以上的节点启动和运行。
将此参数添加到zookeeper属性文件:
时间=200
使用其他参数时需要此参数:

initLimit=5
syncLimit=2

在producer中添加此属性:
props.setproperty(producerconfig.reconnect\u backoff\u ms\u config,“10000”);
"RECONNECT_BACKOFF_MS_CONFIG" 属性只抛出一次警告(不是无限循环),然后发送消息

bybem2ql

bybem2ql2#

我遇到了这个问题,结果发现原因是对一个新的配置属性的误解。
在从以前的producerapi迁移时,我寻找了一个等价的“topic.metadata.refresh.interval.ms”,并选择了producerconfig.metadata\u fetch\u timeout\u config。但是,这是在尝试访问元数据被视为失败之前的超时,因为我将其设置为几分钟,所以它阻止了故障转移的发生。
将此值设置为较低的值(我选择500毫秒)似乎解决了我的问题。
我相信我最初寻找的值是producerconfig.metadata\u max\u age\u config,因为无论是否发生故障,在刷新元数据之前都会超时

相关问题