在旧的命令式编程类型被弃用后,我遇到了一些问题。我有两个微服务(一个作为发布者,另一个作为订阅者),在旧的方式下,通过注解@StreamListener(target = "events", condition = "headers['type']=='consumerPermissionEvent'")
,我可以让两个函数只监听记录,现在我不知道如何做到这一点。
我正在阅读所有文档event routing并尝试使用routing-expression,但两个消费者正在阅读所有记录。
第一个微服务器的应用yaml:
spring:
cloud:
stream:
bindings:
output:
destination: topicEvents
字符串
第二个应用yaml是:
spring:
cloud:
function:
routing-expression: headers['type']
definition: consumerPermissionEvent;consumerApiEvent
stream:
bindings:
consumerPermissionEvent-in-0:
destination: topicUsers
consumerApiEvent-in-0:
destination: topicUsers
型
我从第一个微服务发送这样的消息:
@Autowired
private StreamBridge bridge;
public void send(PermissionEvent event){
Message<PermissionEvent> message = MessageBuilder.withPayload(event)
.setHeader("type","consumerPermissionEvent").build();
bridge.send("output", message);
}
型
第二个微服务有两个消费者:
@Bean
public Consumer<Message<ApiEvent>> consumerApiEvent() {
return e -> log.debug("READED API EVENT: {}", e.getPayload());
}
@Bean
public Consumer<Message<PermissionEvent>> consumerPermissionEvent() {
return e -> log.debug("READED PERMISSION EVENT: {}", e.getPayload());
}
型
第二个微服务的输出日志:
[KafkaConsumerDestination{consumerDestinationName='topicUsers', partitions=1, dlqName='null'}.container-0-C-1] [20b662594644cf2e] DEBUG c.m.o.v.eda.subscribers.NotificationSuscriber - READED API EVENT: ApiEvent(apiId=null)
[KafkaConsumerDestination{consumerDestinationName='topicUsers', partitions=1, dlqName='null'}.container-0-C-1] [20b662594644cf2e] DEBUG c.m.o.v.eda.subscribers.NotificationSuscriber - READED PERMISSION EVENT: PermissionEvent(userRole=roleUseradsf)
型
有什么办法吗?
Thanks in advance
1条答案
按热度按时间q43xntqr1#
你将需要启用路由首先通过使用以下属性:
字符串
有关更多详细信息,请参阅https://cloud.spring.io/spring-cloud-stream/reference/html/spring-cloud-stream.html#spring_cloud_function