发送到主题时引发timeoutexception

ulydmbyx  于 2021-06-07  发布在  Kafka
关注(0)|答案(3)|浏览(311)

我使用kafkatemplatebean创建了sender类,通过senderconfiguration类中的一些配置将有效负载发送到topic。
发件人类

@Component
public class Sender {
    private static final Logger LOGGER = (Logger)  LoggerFactory.getLogger(Sender.class);

    @Autowired
    private KafkaTemplate<String, String>   kafkaTemplate;

    public void send(String topic, String payload) {
        LOGGER.info("sending payload='{}' to topic='{}'", payload, topic);

        kafkaTemplate.send(topic, "1", payload);
    }
}

,发件人配置类

@Configuration
public class SenderConfig {

    @Value("${kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        // list of host:port pairs used for establishing the initial connections to the Kakfa cluster
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        return props;
    }

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    @Bean
    public Sender sender() {
        return new Sender();
    }
}

问题在于发送而不是生产
下面是application.yml文件属性

kafka:
   bootstrap-servers: localhost:9092
topic:
   helloworld: helloworld.t

简单的控制器包含

@RestController
public class Controller {
    protected final static String HELLOWORLD_TOPIC = "helloworld.t";

    @Autowired
    private Sender sender;

    @RequestMapping("/send")
    public String SendMessage() {

        sender.send(HELLOWORLD_TOPIC, "message");

        return "success";
    }
}

例外是

2017-12-20 09:58:04.645  INFO 10816 --- [nio-7060-exec-1] o.a.kafka.common.utils.AppInfoParser     : Kafka version : 0.10.1.1
2017-12-20 09:58:04.645  INFO 10816 --- [nio-7060-exec-1] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId : f10ef2720b03b247
2017-12-20 09:59:04.654 ERROR 10816 --- [nio-7060-exec-1] o.s.k.support.LoggingProducerListener    : Exception thrown when sending a      message with key='1' and payload='message' to topic helloworld.t:

org.apache.kafka.common.errors.TimeoutException: Failed to update   metadata after 60000 ms.
hrysbysz

hrysbysz1#

发生这种错误的可能性很小。
无法使用配置的端口访问kafka broker。
为此,请尝试使用 telnet localhost 9092 如果你得到了输出,那就意味着Kafka·博克正在运行
检查spring boot使用的kafka客户端版本是否与您的kafka版本相同。如果版本不匹配,Kafka可能无法将数据发送到主题
有时经纪人需要时间来了解新创建的主题。因此,生产者可能会失败的错误 Failed to update metadata after 60000 ms. 要解决此问题,请使用kafka命令行选项手动创建kafka。
server.properties的侦听器配置不工作。
你也可以试试这个
将“bootstrap.servers”属性或--broker list选项更改为0.0.0.0:9092
更改2个属性中的server.properties
侦听器=plaintext://your.host.name:9092 to listeners=纯文本://:9092
播发的.listeners=plaintext://your.host.name:9092到adverted.listeners=plaintext://localhost:9092
希望有帮助!

k5hmc34c

k5hmc34c2#

这意味着你的经纪人没有在运作。检查server.log,必要时重新启动代理

sigwle7e

sigwle7e3#

使用包含键的方法

kafkaTemplate.send(topic, key, payload);

不清楚您想要使用什么键值,但是它应该均匀地分布在主题的分区计数中。例如,分区计数范围内的随机数。

相关问题