我想要一些等价于在tomcat中设置此属性的东西。这将设置线程池中的线程数为100,因此可以并发处理的最大请求数为100个请求。
server.tomcat.maxthread=100
字符串
但是在webflux(netty webserver)中,我找不到限制请求的配置属性。
这就是我迄今为止所尝试的
1 -为ReactiveWebServerFactory
定义一个自定义bean。我创建了两个API端点来测试第一个端点是阻塞IO第二个不是,但它一次只能处理一个请求,另一个请求必须等到第一个完成即使它应该运行在不同的线程上。
@Bean
public ReactiveWebServerFactory reactiveWebServerFactory() {
NettyReactiveWebServerFactory factory = new NettyReactiveWebServerFactory();
factory.addServerCustomizers(builder -> builder.runOn(new NioEventLoopGroup(2)));
return factory;
}
型
2 -设置这个属性,我发现在文档中,但它仍然可以处理2个以上的不同线程的请求。(我测试了与上述相同的方式)
server.netty.max-keep-alive-requests=2
型
3 -设置tomcat maxthread为2,这一个没有我预期的效果。
server.tomcat.maxthread=2
型
所以是否可以限制webflux中的最大请求数。
我的API用于测试
@RestController
public class CheckController {
@PostMapping("/test")
public Mono<Long> something() throws InterruptedException {
System.out.println("cores " + Runtime.getRuntime().availableProcessors());
return Mono.just(2L)
.flatMap(d -> simulateBlockingOperation())
.log();
}
@PostMapping("/test2")
public Mono<Long> something2() throws InterruptedException {
return Mono.just(1L)
.log();
}
private Mono<Long> simulateBlockingOperation() {
System.out.println("Current thread " + Thread.currentThread().getName());
int x = 0;
while(x!=1) {}
return Mono.just(2L);
}
}
型
最新消息:我临时解决了这个问题,创建另一个线程池,并使用.subscribeOn来切换线程组,并限制线程组上的线程数量。但我觉得这是一种非常肮脏的方式。
@RestController
public class CheckController {
private final Scheduler customThreadPool;
public CheckController(Scheduler customThreadPool) {
this.customThreadPool = customThreadPool;
}
@Bean
public Scheduler reactiveRequestThreadPool() {
return Schedulers.newBoundedElastic(2, 2, "my-custom-thread");
}
@PostMapping("/test")
public Mono<Long> something() throws InterruptedException {
return Mono.just(2L)
.doOnNext(d -> getCurrentThread())
.flatMap(d -> simulateBlockingOperation())
.subscribeOn(customThreadPool)
.log();
}
@PostMapping("/test2")
public Mono<Long> something2() throws InterruptedException {
return Mono.just(1L)
.doOnNext(d -> getCurrentThread())
.subscribeOn(customThreadPool)
.log();
}
private Mono<Long> simulateBlockingOperation() {
int x = 0;
while(x!=1) {}
return Mono.just(2L);
}
private void getCurrentThread() {
System.out.println("current thread " + Thread.currentThread().getName());
}
}
型
1条答案
按热度按时间ngynwnxp1#
这里有一个想法给你。创建一个过滤器,将覆盖所有的API由您的服务器。在该过滤器有一个静态原子计数器的当前请求。每次请求来检查计数器,如果它不大于你的限制,允许请求和递增计数器。如果它大于你的限制拒绝请求。每次请求完成,响应回来通过你的过滤器-递减计数器。