kafka没有可用作参数的确认

gkl3eglg  于 2021-07-26  发布在  Java
关注(0)|答案(0)|浏览(1499)

我们正试图在JavaSpring项目中实现kafka确认。在没有确认的情况下,我们成功地接收并读取了消息,但是当我们在方法中添加确认时,我们会收到以下错误:

org.springframework.kafka.listener.ListenerExecutionFailedException: invokeHandler Failed; nested exception is java.lang.IllegalStateException: 
No Acknowledgment available as an argument, the listener container must have a MANUAL AckMode to populate the Acknowledgment.;
 nested exception is java.lang.IllegalStateException:
 No Acknowledgment available as an argument, the listener container must have a MANUAL AckMode to populate the Acknowledgment.
 Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from 
 [proto.DeviceModelOuterClass$DevModel] to
 [org.springframework.kafka.support.Acknowledgment] for GenericMessage [payload=sender: "12345678-1234-1234-1234-123456789102"

我们实现确认的方式如kafka api中所述:

@KafkaListener(
      id = Constants.TOPIC_LISTENER,
      topics = "${info.dev.name}",
      autoStartup = "false",
     properties = {
        "value.deserializer=com.cit.iomt.core.DevModelDeserializer",
        "key.deserializer=org.apache.kafka.common.serialization.UUIDDeserializer"
      })
  public void listenToUpdateTopic(@Payload DevModel message, Acknowledgment a) throws Exception {
    LOG.info(Constants.READ_KAFKA_TOPIC, message);
 a.acknowledge();}

在属性文件中我们有:

spring.kafka.listener.ack-mode=manual_immediate
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.auto-offset-reset=earliest

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题