使用函数回复rpc请求

dxxyhpgq  于 2021-07-23  发布在  Java
关注(0)|答案(1)|浏览(373)

我想使用 java.util.Function 通过发送回复请求的方法 RabbitTemplate.convertSendAndReceive . 它可以和 RabbitListener 但我不能让它与函数方法一起工作。
客户(工作)

class Client(private val template RabbitTemplate) {

    fun send() = template.convertSendAndReceive(
        "rpc-exchange",
        "rpc-routing-key",
        "payload message"
    )

}

服务器(方法1,工作)

class Server {

    @RabbitListener(queues = ["rpc-queue"])
    fun receiveRequest(message: String) = "Response Message"

    @Bean
    fun queue(): Queue {
        return Queue("rpc-queue")
    }

    @Bean
    fun exchange(): DirectExchange {
        return DirectExchange("rpc-exchange")
    }

    @Bean
    fun binding(exchange: DirectExchange, queue: Queue): Binding {
        return BindingBuilder.bind(queue).to(exchange).with("rpc-routing-key")
    }

}

服务器(方法2,不工作)-->目标

class Server {

    @Bean
    fun receiveRequest(): Function<String, String> {
        return Function { value: String ->
            "Response Message"
        }
    }

}

配置(方法2)

spring.cloud.function.definition: receiveRequest
spring.cloud.stream.binding.receiveRequest-in-0.destination: rpc-exchange
spring.cloud.stream.binding.receiveRequest-in-0.group: rpc-queue
spring.cloud.stream.rabbit.bindings.receiveRequest-in-0.consumer.bindingRoutingKey: rpc-routing-key

使用方法2,服务器接收。不幸的是,响应丢失。有人知道如何在函数方法中使用rpc模式吗?我不想使用 RabbitListener .
请参阅文档/教程。

zf9nrax1

zf9nrax11#

springcloudstream并不是真正为服务器端的rpc设计的,所以它不会像以前那样自动处理这个问题 @RabbitListener 做。
但是,您可以通过添加输出绑定来实现这一点,以将应答路由到默认的交换和 replyTo 标题:

spring.cloud.function.definition: receiveRequest
spring.cloud.stream.bindings.receiveRequest-in-0.destination: rpc-exchange
spring.cloud.stream.bindings.receiveRequest-in-0.group: rpc-queue
spring.cloud.stream.rabbit.bindings.receiveRequest-in-0.consumer.bindingRoutingKey: rpc-routing-key

spring.cloud.stream.bindings.receiveRequest-out-0.destination=
spring.cloud.stream.rabbit.bindings.receiveRequest-out-0.producer.routing-key-expression=headers['amqp_replyTo']

# logging.level.org.springframework.amqp=debug
@SpringBootApplication
public class So66586230Application {

    public static void main(String[] args) {
        SpringApplication.run(So66586230Application.class, args);
    }

    @Bean
    Function<String, String> receiveRequest() {
        return str -> {
            return str.toUpperCase();
        };
    }

    @Bean
    public ApplicationRunner runner(RabbitTemplate template) {
        return args -> {
            System.out.println(new String((byte[]) template.convertSendAndReceive(
                    "rpc-exchange",
                    "rpc-routing-key",
                    "payload message")));
        };
    }

}
PAYLOAD MESSAGE

请注意,答复将作为 byte[] ; 可以使用模板上的自定义消息转换器将其转换为字符串。
编辑
作为对以下第三条评论的答复。
这个 RabbitTemplate 默认情况下使用直接应答,因此应答地址不是真正的队列,而是由绑定器创建并与模板中的使用者关联的伪队列。
您也可以将模板配置为使用临时回复队列,但它们也会由默认的exchange“”路由到。
但是,您可以使用模板作为侦听器来配置外部应答容器。
然后,您可以使用所需的任何交换和路由密钥路由回。
综合起来:

spring.cloud.function.definition: receiveRequest
spring.cloud.stream.bindings.receiveRequest-in-0.destination: rpc-exchange
spring.cloud.stream.bindings.receiveRequest-in-0.group: rpc-queue
spring.cloud.stream.rabbit.bindings.receiveRequest-in-0.consumer.bindingRoutingKey: rpc-routing-key

spring.cloud.stream.bindings.receiveRequest-out-0.destination=reply-exchange
spring.cloud.stream.rabbit.bindings.receiveRequest-out-0.producer.routing-key-expression='reply-routing-key'
spring.cloud.stream.rabbit.bindings.receiveRequest-out-0.producer.declare-exchange=false

spring.rabbitmq.template.reply-timeout=10000

# logging.level.org.springframework.amqp=debug
public class So66586230Application {

    public static void main(String[] args) {
        SpringApplication.run(So66586230Application.class, args);
    }

    @Bean
    Function<String, String> receiveRequest() {
        return str -> {
            return str.toUpperCase();
        };
    }

    @Bean
    SimpleMessageListenerContainer replyContainer(SimpleRabbitListenerContainerFactory factory,
            RabbitTemplate template) {

        template.setReplyAddress("reply-queue");
        SimpleMessageListenerContainer container = factory.createListenerContainer();
        container.setQueueNames("reply-queue");
        container.setMessageListener(template);
        return container;
    }

    @Bean
    public ApplicationRunner runner(RabbitTemplate template, SimpleMessageListenerContainer replyContainer) {
        return args -> {
            System.out.println(new String((byte[]) template.convertSendAndReceive(
                    "rpc-exchange",
                    "rpc-routing-key",
                    "payload message")));
        };
    }

}

重要提示:如果您有多个客户端示例,那么每个示例都需要自己的应答队列。
在这种情况下,路由密钥必须是队列名称,您应该返回到上一个示例以设置路由密钥表达式(从报头获取队列名称)。

相关问题