我有两个kafka代理:server1:9092和server2:9092我正在使用一个java客户机向这个集群发送消息,代码如下:
@Test
public void sendRecordToTopic() throws InterruptedException, ExecutionException {
//See at http://kafka.apache.org/documentation.html#newproducerconfigs
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"server1:9092,server2:9092");
props.put(ProducerConfig.ACKS_CONFIG, "1");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
ProducerRecord<String, String> myRecord =
new ProducerRecord<String, String>("my-replicated-topic", "test", "someValue");
boolean syncSend = true;
if (syncSend) {
//Synchronously send
producer.send(myRecord).get();
} else {
//Asynchronously send
producer.send(myRecord);
}
producer.close();
}
当某个代理关闭时,测试在某些情况下会引发此异常(在此异常示例中,“server1”已关闭):
2015-11-02 17:59:29,138 warn[org.apache.kafka.common.network.selector]server1/40.35.250.227 java.net.connectexception的i/o出错:连接被拒绝:sun.nio.ch.socketchannelimpl.checkconnect(本机方法)的sun.nio.ch.socketchannelimpl.finishconnect(socketchannelimpl)没有进一步的信息。java:717)在org.apache.kafka.common.network.selector.poll(选择器。java:238)在org.apache.kafka.clients.networkclient.poll(networkclient。java:192)在org.apache.kafka.clients.producer.internals.sender.run(sender。java:191)在org.apache.kafka.clients.producer.internals.sender.run(sender。java:122)在java.lang.thread.run(线程。java:745)
2条答案
按热度按时间iyr7buue1#
这就是我解决问题的方法:
至少需要3个zookeeper节点,我必须再配置一个。这是因为zk确定leader的方式需要50%以上的节点启动和运行。
将此参数添加到zookeeper属性文件:
时间=200
使用其他参数时需要此参数:
在producer中添加此属性:
props.setproperty(producerconfig.reconnect\u backoff\u ms\u config,“10000”);
与
"RECONNECT_BACKOFF_MS_CONFIG"
属性只抛出一次警告(不是无限循环),然后发送消息bybem2ql2#
我遇到了这个问题,结果发现原因是对一个新的配置属性的误解。
在从以前的producerapi迁移时,我寻找了一个等价的“topic.metadata.refresh.interval.ms”,并选择了producerconfig.metadata\u fetch\u timeout\u config。但是,这是在尝试访问元数据被视为失败之前的超时,因为我将其设置为几分钟,所以它阻止了故障转移的发生。
将此值设置为较低的值(我选择500毫秒)似乎解决了我的问题。
我相信我最初寻找的值是producerconfig.metadata\u max\u age\u config,因为无论是否发生故障,在刷新元数据之前都会超时