io.github.resilience4j.bulkhead.Bulkhead类的使用及代码示例

x33g5p2x  于2022-01-17 转载在 其他  
字(11.3k)|赞(0)|评价(0)|浏览(166)

本文整理了Java中io.github.resilience4j.bulkhead.Bulkhead类的一些代码示例,展示了Bulkhead类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Bulkhead类的具体详情如下:
包路径:io.github.resilience4j.bulkhead.Bulkhead
类名称:Bulkhead

Bulkhead介绍

[英]A Bulkhead instance is thread-safe can be used to decorate multiple requests. A Bulkhead represent an entity limiting the amount of parallel operations. It does not assume nor does it mandate usage of any particular concurrency and/or io model. These details are left for the client to manage. This bulkhead, depending on the underlying concurrency/io model can be used to shed load, and, where it makes sense, limit resource use (i.e. limit amount of threads/actors involved in a particular flow, etc). In order to execute an operation protected by this bulkhead, a permission must be obtained by calling Bulkhead#isCallPermitted()If the bulkhead is full, no additional operations will be permitted to execute until space is available. Once the operation is complete, regardless of the result, client needs to call Bulkhead#onComplete() in order to maintain integrity of internal bulkhead state.
[中]一个线程安全的隔板实例可用于修饰多个请求。隔板表示限制并行操作数量的实体。它不假设也不强制使用任何特定的并发和/或io模型。这些详细信息留给客户机管理。根据底层并发/io模型,此隔板可用于卸载,并在合理的情况下限制资源使用(即限制特定流中涉及的线程/参与者数量等)。为了执行受此隔板保护的操作,必须通过调用隔板#iscallapproved()获得许可。如果隔板已满,则在空间可用之前,不允许执行其他操作。一旦操作完成,不管结果如何,客户端都需要调用隔板#onComplete(),以保持内部隔板状态的完整性。

代码示例

代码示例来源:origin: resilience4j/resilience4j

private Object handleOther(MethodInvocation invocation, io.github.resilience4j.bulkhead.Bulkhead bulkhead, RecoveryFunction<?> recoveryFunction) throws Throwable {
    boolean permission = bulkhead.isCallPermitted();

    if (!permission) {
      Throwable t = new BulkheadFullException(String.format("Bulkhead '%s' is full", bulkhead.getName()));
      return recoveryFunction.apply(t);
    }

    try {
      if (Thread.interrupted()) {
        throw new IllegalStateException("Thread was interrupted during permission wait");
      }

      return invocation.proceed();
    } catch (Exception e) {
      return recoveryFunction.apply(e);
    } finally {
      bulkhead.onComplete();
    }
  }
}

代码示例来源:origin: resilience4j/resilience4j

private BulkheadMetrics(String prefix, Iterable<Bulkhead> bulkheads) {
  requireNonNull(prefix);
  requireNonNull(bulkheads);
  bulkheads.forEach(bulkhead -> {
        String name = bulkhead.getName();
    //number of available concurrent calls as an integer
        metricRegistry.register(name(prefix, name, AVAILABLE_CONCURRENT_CALLS),
            (Gauge<Integer>) () -> bulkhead.getMetrics().getAvailableConcurrentCalls());
      }
  );
}

代码示例来源:origin: resilience4j/resilience4j

@Test
public void testCreateWithDefaults() {
  // when
  Bulkhead bulkhead = Bulkhead.ofDefaults("test");
  // then
  assertThat(bulkhead).isNotNull();
  assertThat(bulkhead.getBulkheadConfig()).isNotNull();
}

代码示例来源:origin: resilience4j/resilience4j

public static void isCallPermitted(Bulkhead bulkhead) {
    if(!bulkhead.isCallPermitted()) {
      throw new BulkheadFullException(String.format("Bulkhead '%s' is full", bulkhead.getName()));
    }
  }
}

代码示例来源:origin: resilience4j/resilience4j

@Test
public void shouldNotReleaseBulkheadWhenWasDisposedAfterNotPermittedSubscribe() throws Exception {
  // Given
  Disposable disposable = mock(Disposable.class);
  MaybeObserver childObserver = mock(MaybeObserver.class);
  MaybeObserver decoratedObserver = BulkheadOperator.of(bulkhead).apply(childObserver);
  bulkhead.isCallPermitted();
  assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(0);
  decoratedObserver.onSubscribe(disposable);
  // When
  ((Disposable) decoratedObserver).dispose();
  // Then
  assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(0);
}

代码示例来源:origin: resilience4j/resilience4j

@Test
public void testBulkhead() throws InterruptedException {
  bulkhead.isCallPermitted();
  bulkhead.isCallPermitted();
  assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(0);
  bulkhead.isCallPermitted();
  bulkhead.onComplete();
  assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(1);
  bulkhead.onComplete();
  assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(2);
  bulkhead.isCallPermitted();
  testSubscriber.assertValueCount(6)
         .assertValues(CALL_PERMITTED, CALL_PERMITTED, CALL_REJECTED, CALL_FINISHED, CALL_FINISHED, CALL_PERMITTED);
}

代码示例来源:origin: resilience4j/resilience4j

private void releaseBulkhead() {
    if (permitted.compareAndSet(Permit.ACQUIRED, Permit.RELEASED)) {
      bulkhead.onComplete();
    }
  }
}

代码示例来源:origin: resilience4j/resilience4j

@Override
protected Throwable getThrowable() {
  return new BulkheadFullException(String.format("Bulkhead '%s' is full", bulkhead.getName()));
}

代码示例来源:origin: resilience4j/resilience4j

@Test
public void shouldReturnFailureWithRuntimeException() {
  // Given
  BulkheadConfig config = BulkheadConfig.custom().maxConcurrentCalls(2).build();
  Bulkhead bulkhead = Bulkhead.of("test", config);
  bulkhead.isCallPermitted();
  //v When
  CheckedRunnable checkedRunnable = Bulkhead.decorateCheckedRunnable(bulkhead, () -> {throw new RuntimeException("BAM!");});
  Try result = Try.run(checkedRunnable);
  //Then
  assertThat(result.isFailure()).isTrue();
  assertThat(result.failed().get()).isInstanceOf(RuntimeException.class);
  assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(1);
}

代码示例来源:origin: resilience4j/resilience4j

@Test
public void shouldReturnTheCorrectName() {
  Bulkhead bulkhead = registry.bulkhead("test");
  assertThat(bulkhead).isNotNull();
  assertThat(bulkhead.getName()).isEqualTo("test");
  assertThat(bulkhead.getBulkheadConfig().getMaxConcurrentCalls()).isEqualTo(25);
  assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(25);
}

代码示例来源:origin: resilience4j/resilience4j

@Test
public void shouldConsumeOnCallRejectedEvent() {
  // Given
  Bulkhead bulkhead = Bulkhead.of("test", config);
  // When
  bulkhead.getEventPublisher()
      .onCallRejected(event ->
          logger.info(event.getEventType().toString()));
  bulkhead.isCallPermitted();
  Try.ofSupplier(Bulkhead.decorateSupplier(bulkhead,helloWorldService::returnHelloWorld));
  // Then
  then(logger).should(times(1)).info("CALL_REJECTED");
}

代码示例来源:origin: resilience4j/resilience4j

@Test
public void shouldInvokeAsyncApply() throws ExecutionException, InterruptedException {
  // tag::shouldInvokeAsyncApply[]
  // Given
  Bulkhead bulkhead = Bulkhead.of("test", config);
  // When
  Supplier<String> decoratedSupplier = Bulkhead.decorateSupplier(bulkhead, () -> "This can be any method which returns: 'Hello");
  CompletableFuture<String> future = CompletableFuture.supplyAsync(decoratedSupplier)
                            .thenApply(value -> value + " world'");
  String result = future.get();
  // Then
  assertThat(result).isEqualTo("This can be any method which returns: 'Hello world'");
  assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(1);
  // end::shouldInvokeAsyncApply[]
}

代码示例来源:origin: resilience4j/resilience4j

@Test
public void shouldDecorateCompletionStageAndReturnWithExceptionAtAsyncStage() throws ExecutionException, InterruptedException {
  // Given
  Bulkhead bulkhead = Bulkhead.of("test", config);
  BDDMockito.given(helloWorldService.returnHelloWorld()).willThrow(new RuntimeException("BAM! At async stage"));
  // When
  Supplier<CompletionStage<String>> completionStageSupplier =
      () -> CompletableFuture.supplyAsync(helloWorldService::returnHelloWorld);
  Supplier<CompletionStage<String>> decoratedCompletionStageSupplier =
      Bulkhead.decorateCompletionStage(bulkhead, completionStageSupplier);
  CompletionStage<String> decoratedCompletionStage = decoratedCompletionStageSupplier.get();
  // Then the helloWorldService should be invoked 1 time
  assertThatThrownBy(decoratedCompletionStage.toCompletableFuture()::get)
      .isInstanceOf(ExecutionException.class).hasCause(new RuntimeException("BAM! At async stage"));
  BDDMockito.then(helloWorldService).should(times(1)).returnHelloWorld();
  assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(1);
}

代码示例来源:origin: resilience4j/resilience4j

);
chain1.get("stream/events", ctx -> {
  Seq<Flux<BulkheadEvent>> eventStreams = bulkheadRegistry.getAllBulkheads().map(bulkhead -> ReactorAdapter.toFlux(bulkhead.getEventPublisher()));
  Function<BulkheadEvent, String> data = b -> Jackson.getObjectWriter(chain1.getRegistry()).writeValueAsString(BulkheadEventDTO.createEventDTO(b));
  ServerSentEvents events = ServerSentEvents.serverSentEvents(Flux.merge(eventStreams), e -> e.id(BulkheadEvent::getBulkheadName).event(c -> c.getEventType().name()).data(data));
chain1.get("stream/events/:name", ctx -> {
  String bulkheadName = ctx.getPathTokens().get("name");
  Bulkhead bulkhead = bulkheadRegistry.getAllBulkheads().find(b -> b.getName().equals(bulkheadName))
      .getOrElseThrow(() -> new IllegalArgumentException(String.format("bulkhead with name %s not found", bulkheadName)));
  Function<BulkheadEvent, String> data = b -> Jackson.getObjectWriter(chain1.getRegistry()).writeValueAsString(BulkheadEventDTO.createEventDTO(b));
  ServerSentEvents events = ServerSentEvents.serverSentEvents(ReactorAdapter.toFlux(bulkhead.getEventPublisher()), e -> e.id(BulkheadEvent::getBulkheadName).event(c -> c.getEventType().name()).data(data));
  ctx.render(events);
});
  String bulkheadName = ctx.getPathTokens().get("name");
  String eventType = ctx.getPathTokens().get("type");
  Bulkhead bulkhead = bulkheadRegistry.getAllBulkheads().find(b -> b.getName().equals(bulkheadName))
      .getOrElseThrow(() -> new IllegalArgumentException(String.format("bulkhead with name %s not found", bulkheadName)));
  Flux<BulkheadEvent> eventStream = ReactorAdapter.toFlux(bulkhead.getEventPublisher())
      .filter(event -> event.getEventType() == BulkheadEvent.Type.valueOf(eventType.toUpperCase()));
  Function<BulkheadEvent, String> data = b -> Jackson.getObjectWriter(chain1.getRegistry()).writeValueAsString(BulkheadEventDTO.createEventDTO(b));

代码示例来源:origin: resilience4j/resilience4j

@Test
public void shouldEmitEvent() {
  StepVerifier.create(
      Mono.just("Event")
          .transform(BulkheadOperator.of(bulkhead)))
      .expectNext("Event")
      .verifyComplete();
  assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(1);
}

代码示例来源:origin: resilience4j/resilience4j

@Test
public void shouldDecorateRunnableAndReturnWithSuccess() throws Throwable {
  // Given
  Bulkhead bulkhead = Bulkhead.of("test", config);
  //When
  Bulkhead.decorateRunnable(bulkhead, helloWorldService::sayHelloWorld)
      .run();
  //Then
  assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(1);
  BDDMockito.then(helloWorldService).should(times(1)).sayHelloWorld();
}

代码示例来源:origin: resilience4j/resilience4j

@Override
protected boolean isCallPermitted() {
  return bulkhead.isCallPermitted();
}

代码示例来源:origin: resilience4j/resilience4j

@Test
  public void shouldConsumeOnCallFinishedEventOnComplete() throws Exception {
    // Given
    Bulkhead bulkhead = Bulkhead.of("test", config);

    // When
    bulkhead.getEventPublisher()
        .onCallFinished(event ->
            logger.info(event.getEventType().toString()));

    bulkhead.onComplete();

    // Then
    then(logger).should(times(1)).info("CALL_FINISHED");
  }
}

代码示例来源:origin: resilience4j/resilience4j

@Test
public void shouldDecorateConsumerAndReturnWithSuccess() throws Throwable {
  // Given
  Bulkhead bulkhead = Bulkhead.of("test", config);
  // When
  Bulkhead.decorateConsumer(bulkhead, helloWorldService::sayHelloWorldWithName)
      .accept("Tom");
  // Then
  assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(1);
  BDDMockito.then(helloWorldService).should(times(1)).sayHelloWorldWithName("Tom");
}

代码示例来源:origin: resilience4j/resilience4j

@Test(expected = NullPointerException.class)
public void testConstructorWithNullName() {
  BulkheadExports.ofSupplier(null, () -> singleton(Bulkhead.ofDefaults("foo")));
}

相关文章