kafka错误处理:processor.output().send(message,kafkatimeoutinms)始终返回true及其异步

fafcakar  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(287)

可能是这个问题已经上报并解决了,我没有找到解决方案,也没有任何关于这个问题的开放性问题,所以创建了一个新的。我正在尝试处理将数据发布到kafka主题时的错误。Kafka的Spring蒸汽,我们是推动Kafka使用这个

if (processor.output().send(messsage , kafkaTimeoutInMS) && acknowledgment != null)
                {
                    LOGGER.debug("Acknowledgment provided");
                    LOGGER.info("Sending to Kafka successful");
                    acknowledgment.acknowledge();
                }
                else
                {
                    LOGGER.error("Sending to Kafka failed", message);
                }

send()方法总是返回true,我试图在调试模式下运行时停止kafka manual,但它仍然返回true。我已经读到它是异步的。
我试着设置

bindings: output: producer: sync: true

这没用。但我看到了一些错误,我不能用我的逻辑来判断是失败还是成功。
我们是手动确认的,因此我们应该只确认它成功发送到主题时,我们需要记录所有失败的消息。有什么建议吗?

kxeu7u2r

kxeu7u2r1#

我相信你误解了Spring Cloud流的工作原理。作为一个框架,用户和框架之间有一定的契约,当涉及到消息传递时,ack、retries、dlq和许多其他方面都会自动处理,以确保用户不必手动暴露于此(正如您正在尝试的那样)。
考虑花点时间浏览一下用户指南-https://docs.spring.io/spring-cloud-stream/docs/fishtown.m3/reference/htmlsingle/
另外,下面是一个非常基本的示例,它将演示用户(开发人员)与框架的典型交互。如您所见,您所做的只是实现一个简单的处理程序,它接收并返回一段数据。其余部分(从kafka接收并发送到kafka或任何其他消息传递系统)由框架提供的绑定器处理。

@SpringBootApplication
@EnableBinding(Processor.class)
public class ProcessorApplication { 

    public static void main(String[] args) {
        SpringApplication.run(ProcessorApplication.class);
    }

    @StreamListener(Processor.INPUT)
    @SendTo(Processor.OUTPUT)
    public String echo(String message) {
        return message;
    }
}

相关问题