如何限制Spring webflux(netty webserver)中可以处理的请求数量

vc9ivgsu  于 4个月前  发布在  Spring
关注(0)|答案(1)|浏览(85)

我想要一些等价于在tomcat中设置此属性的东西。这将设置线程池中的线程数为100,因此可以并发处理的最大请求数为100个请求。

server.tomcat.maxthread=100

字符串
但是在webflux(netty webserver)中,我找不到限制请求的配置属性。
这就是我迄今为止所尝试的
1 -为ReactiveWebServerFactory定义一个自定义bean。我创建了两个API端点来测试第一个端点是阻塞IO第二个不是,但它一次只能处理一个请求,另一个请求必须等到第一个完成即使它应该运行在不同的线程上。

@Bean
public ReactiveWebServerFactory reactiveWebServerFactory() {
        NettyReactiveWebServerFactory factory = new NettyReactiveWebServerFactory();
        factory.addServerCustomizers(builder -> builder.runOn(new NioEventLoopGroup(2)));

        return factory;
}


2 -设置这个属性,我发现在文档中,但它仍然可以处理2个以上的不同线程的请求。(我测试了与上述相同的方式)

server.netty.max-keep-alive-requests=2


3 -设置tomcat maxthread为2,这一个没有我预期的效果。

server.tomcat.maxthread=2


所以是否可以限制webflux中的最大请求数。
我的API用于测试

@RestController
public class CheckController {

    @PostMapping("/test")
    public Mono<Long> something() throws InterruptedException {
        System.out.println("cores " + Runtime.getRuntime().availableProcessors());
        return Mono.just(2L)
                .flatMap(d -> simulateBlockingOperation())
                .log();
    }

    @PostMapping("/test2")
    public Mono<Long> something2() throws InterruptedException {
        return Mono.just(1L)
                .log();
    }

    private Mono<Long> simulateBlockingOperation() {
        System.out.println("Current thread " + Thread.currentThread().getName());
        int x = 0;
        while(x!=1) {}
        return Mono.just(2L);
    }
}


最新消息:我临时解决了这个问题,创建另一个线程池,并使用.subscribeOn来切换线程组,并限制线程组上的线程数量。但我觉得这是一种非常肮脏的方式。

@RestController
public class CheckController {

    private final Scheduler customThreadPool;

    public CheckController(Scheduler customThreadPool) {
        this.customThreadPool = customThreadPool;
    }
   @Bean
   public Scheduler reactiveRequestThreadPool() {
       return Schedulers.newBoundedElastic(2, 2, "my-custom-thread");
   }

    @PostMapping("/test")
    public Mono<Long> something() throws InterruptedException {
        return Mono.just(2L)
                .doOnNext(d -> getCurrentThread())
                .flatMap(d -> simulateBlockingOperation())
                .subscribeOn(customThreadPool)
                .log();
    }

    @PostMapping("/test2")
    public Mono<Long> something2() throws InterruptedException {
        return Mono.just(1L)
                .doOnNext(d -> getCurrentThread())
                .subscribeOn(customThreadPool)
                .log();
    }

    private Mono<Long> simulateBlockingOperation() {
        int x = 0;
        while(x!=1) {}
        return Mono.just(2L);
    }

    private void getCurrentThread() {
        System.out.println("current thread " + Thread.currentThread().getName());
    }
}

ngynwnxp

ngynwnxp1#

这里有一个想法给你。创建一个过滤器,将覆盖所有的API由您的服务器。在该过滤器有一个静态原子计数器的当前请求。每次请求来检查计数器,如果它不大于你的限制,允许请求和递增计数器。如果它大于你的限制拒绝请求。每次请求完成,响应回来通过你的过滤器-递减计数器。

相关问题