为@kafkalistener创建元注解

eyh26e7m  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(420)

我试图创建基于@kafkalistener注解的自定义元注解,但是spring kafka抛出了一个错误。
以下是我的注解:

@Target({ElementType.TYPE, ElementType.METHOD, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@KafkaListener
public @interface MessageListener {
  @AliasFor(annotation = KafkaListener.class, attribute = "topics")
  String[] value() default {};
}

我的客户服务:

@EnableKafka
public class CarSubscriber extends BaseSubscriber<String, Car> {
    private static final Logger LOGGER = 
    LoggerFactory.getLogger(CarSubscriber.class);

    @MyMessageListener("${kafka.topic.car}")
    public void receiveMessage(Car car) {

       LOGGER.info("received car=" + car.toString());
       getLatch().countDown();
    }
}

错误堆栈跟踪:

java.lang.IllegalArgumentException: An array of topicPartitions must be provided
at org.springframework.util.Assert.notEmpty(Assert.java:228) ~[spring-core-4.3.9.RELEASE.jar:4.3.9.RELEASE]
at org.springframework.kafka.listener.config.ContainerProperties.<init>(ContainerProperties.java:175) ~[spring-kafka-1.3.2.RELEASE.jar:na]
at org.springframework.kafka.listener.AbstractMessageListenerContainer.<init>(AbstractMessageListenerContainer.java:130) ~[spring-kafka-1.3.2.RELEASE.jar:na]
at org.springframework.kafka.listener.ConcurrentMessageListenerContainer.<init>(ConcurrentMessageListenerContainer.java:69) ~[spring-kafka-1.3.2.RELEASE.jar:na]
at org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory.createContainerInstance(ConcurrentKafkaListenerContainerFactory.java:70) ~[spring-kafka-1.3.2.RELEASE.jar:na]
at org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory.createContainerInstance(ConcurrentKafkaListenerContainerFactory.java:40) ~[spring-kafka-1.3.2.RELEASE.jar:na]
at org.springframework.kafka.config.AbstractKafkaListenerContainerFactory.createListenerContainer(AbstractKafkaListenerContainerFactory.java:177) ~[spring-kafka-1.3.2.RELEASE.jar:na]
at org.springframework.kafka.config.AbstractKafkaListenerContainerFactory.createListenerContainer(AbstractKafkaListenerContainerFactory.java:46) ~[spring-kafka-1.3.2.RELEASE.jar:na]
at org.springframework.kafka.config.KafkaListenerEndpointRegistry.createListenerContainer(KafkaListenerEndpointRegistry.java:183) ~[spring-kafka-1.3.2.RELEASE.jar:na]
at org.springframework.kafka.config.KafkaListenerEndpointRegistry.registerListenerContainer(KafkaListenerEndpointRegistry.java:155) ~[spring-kafka-1.3.2.RELEASE.jar:na]
at org.springframework.kafka.config.KafkaListenerEndpointRegistry.registerListenerContainer(KafkaListenerEndpointRegistry.java:129) ~[spring-kafka-1.3.2.RELEASE.jar:na]
at org.springframework.kafka.config.KafkaListenerEndpointRegistrar.registerAllEndpoints(KafkaListenerEndpointRegistrar.java:138) ~[spring-kafka-1.3.2.RELEASE.jar:na]
at org.springframework.kafka.config.KafkaListenerEndpointRegistrar.afterPropertiesSet(KafkaListenerEndpointRegistrar.java:132) ~[spring-kafka-1.3.2.RELEASE.jar:na]
at org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor.afterSingletonsInstantiated(KafkaListenerAnnotationBeanPostProcessor.java:230) ~[spring-kafka-1.3.2.RELEASE.jar:na]
at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:781) ~[spring-beans-4.3.9.RELEASE.jar:4.3.9.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:867) ~[spring-context-4.3.9.RELEASE.jar:4.3.9.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:543) ~[spring-context-4.3.9.RELEASE.jar:4.3.9.RELEASE]
at org.springframework.boot.context.embedded.EmbeddedWebApplicationContext.refresh(EmbeddedWebApplicationContext.java:122) ~[spring-boot-1.5.4.RELEASE.jar:1.5.4.RELEASE]
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:693) [spring-boot-1.5.4.RELEASE.jar:1.5.4.RELEASE]
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:360) [spring-boot-1.5.4.RELEASE.jar:1.5.4.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:303) [spring-boot-1.5.4.RELEASE.jar:1.5.4.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1118) [spring-boot-1.5.4.RELEASE.jar:1.5.4.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1107) [spring-boot-1.5.4.RELEASE.jar:1.5.4.RELEASE]

实际上,我想在meta注解中指定主题名,换句话说就是在@mymessagelistener中,如上所示。但它不起作用。
是我做错了什么,还是spring-kafka只是不支持@aliasforannotations for@kafkalistener?请帮忙。

pgvzfuti

pgvzfuti1#

您必须使用@mymessagelistener(topics=“${kafka.topic.car}”)而不是
@mymessagelistener(${kafka.topic.car})。

相关问题