java 使用Spring Cloud Streams功能的路由条件

cig3rfwq  于 5个月前  发布在  Java
关注(0)|答案(1)|浏览(57)

在旧的命令式编程类型被弃用后,我遇到了一些问题。我有两个微服务(一个作为发布者,另一个作为订阅者),在旧的方式下,通过注解@StreamListener(target = "events", condition = "headers['type']=='consumerPermissionEvent'"),我可以让两个函数只监听记录,现在我不知道如何做到这一点。
我正在阅读所有文档event routing并尝试使用routing-expression,但两个消费者正在阅读所有记录。
第一个微服务器的应用yaml:

spring:
  cloud:
    stream:
      bindings:
        output: 
          destination: topicEvents

字符串
第二个应用yaml是:

spring:
  cloud:
    function:
      routing-expression: headers['type']
      definition: consumerPermissionEvent;consumerApiEvent
    stream:
      bindings:
        consumerPermissionEvent-in-0:
          destination: topicUsers
        consumerApiEvent-in-0:
          destination: topicUsers


我从第一个微服务发送这样的消息:

@Autowired
private StreamBridge bridge;

public void send(PermissionEvent event){
    Message<PermissionEvent> message = MessageBuilder.withPayload(event)
            .setHeader("type","consumerPermissionEvent").build();
    bridge.send("output", message);
}


第二个微服务有两个消费者:

@Bean
    public Consumer<Message<ApiEvent>> consumerApiEvent() {
        return e -> log.debug("READED API EVENT: {}", e.getPayload());
    }

    @Bean
    public Consumer<Message<PermissionEvent>> consumerPermissionEvent() {
        return e -> log.debug("READED PERMISSION EVENT: {}", e.getPayload());
    }


第二个微服务的输出日志:

[KafkaConsumerDestination{consumerDestinationName='topicUsers', partitions=1, dlqName='null'}.container-0-C-1] [20b662594644cf2e] DEBUG c.m.o.v.eda.subscribers.NotificationSuscriber - READED API EVENT: ApiEvent(apiId=null)
[KafkaConsumerDestination{consumerDestinationName='topicUsers', partitions=1, dlqName='null'}.container-0-C-1] [20b662594644cf2e] DEBUG c.m.o.v.eda.subscribers.NotificationSuscriber - READED PERMISSION EVENT: PermissionEvent(userRole=roleUseradsf)


有什么办法吗?
Thanks in advance

q43xntqr

q43xntqr1#

你将需要启用路由首先通过使用以下属性:

--spring.cloud.stream.function.routing.enabled=true

字符串
有关更多详细信息,请参阅https://cloud.spring.io/spring-cloud-stream/reference/html/spring-cloud-stream.html#spring_cloud_function

相关问题