为什么kafka流没有@streamlistener就不能工作?

t40tm48m  于 2021-07-24  发布在  Java
关注(0)|答案(1)|浏览(298)

@streamlistener现在被弃用了,所以我尝试用函数的方式重写我的使用者。
java代码

@Component
public class MyConsumer {
    public void handleMyMethod(MessageEntity message) {
        ...
    }
}

@RequiredArgsConstructor
@Configuration
public class MyConsumerConfiguration {
    private final MyConsumer myConsumer;

    @Bean
    public Consumer<MessageEntity> myMethod() {
        return myConsumer::handleMyMethod;
    }
}

应用程序.yaml

spring.cloud :
    stream :
        kafka.binder.brokers :
            - "127.0.0.1:9092"
        function.definition : myMethod
        bindings :
            myMethod-in-0 :
                destination : service.method.update

测试

@Test
void handleMyMethod() {
    MessageEntity message = new MessageEntity();
    template.send("service.method.update", message);

    await()
        .atMost(6, TimeUnit.SECONDS)
        .untilAsserted(() -> {
            verify(myConsumer, atLeastOnce()).handleMyMethod(entityCaptor.capture());

            MessageEntity entity = entityCaptor.getValue();
            assertNotNull(entity);
        });
}

运行测试。这条消息正在传递给Kafka(我在Kafka工具中看到它),但我的消费者没有捕捉到它。测试失败,错误为:

myConsumer.handleMethod(
    <Capturing argument>
);
-> at com.my.project.MyConsumer.handleMethod(MyConsumer.java:33)
Actually, there were zero interactions with this mock.```
6ju8rftf

6ju8rftf1#

我发现了问题。我的util模块集成在上面描述的模块中,它有自己的“function.definition”属性。由于上面的模块是在util模块之前配置的,util模块从顶级模块中删除方法。我这样解决问题:

@Autowired
StreamFunctionProperties streamFunctionProperties;

...

String definitionString = StringUtils.hasText(streamFunctionProperties.getDefinition()) ?
    streamFunctionProperties.getDefinition() + ";" :
    "";
streamFunctionProperties.setDefinition(definitionString + "methodDeleteOut;methodUpdateOut;roleUpdateIn;roleDeleteIn");

相关问题