reactor.core.publisher.Hooks.resetOnErrorDropped()方法的使用及代码示例

x33g5p2x  于2022-01-20 转载在 其他  
字(10.4k)|赞(0)|评价(0)|浏览(125)

本文整理了Java中reactor.core.publisher.Hooks.resetOnErrorDropped()方法的一些代码示例,展示了Hooks.resetOnErrorDropped()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Hooks.resetOnErrorDropped()方法的具体详情如下:
包路径:reactor.core.publisher.Hooks
类名称:Hooks
方法名:resetOnErrorDropped

Hooks.resetOnErrorDropped介绍

[英]Reset global error dropped strategy to bubbling back the error.
[中]

代码示例

代码示例来源:origin: reactor/reactor-core

public void unplugHooks() {
  Hooks.resetOnNextDropped();
  Hooks.resetOnErrorDropped();
  Hooks.resetOnOperatorError();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void completeHookErrorDropped() {
  Hooks.onErrorDropped(e -> assertTrue(e.getMessage().equals("complete")));
  try {
    Mono.just("foo")
      .subscribe(v -> {},
          e -> {},
          () -> { throw new IllegalStateException("complete");});
  }
  finally {
    Hooks.resetOnErrorDropped();
  }
}

代码示例来源:origin: reactor/reactor-core

@Test
public void validationNull() {
  Hooks.onErrorDropped(e -> assertThat(e).isInstanceOf(NullPointerException.class)
                      .hasMessage("next is null"));
  try {
    assertThat(OperatorDisposables.validate(null, null,
        e -> Operators.onErrorDropped(e, Context.empty()))).isFalse();
  } finally {
    Hooks.resetOnErrorDropped();
  }
}

代码示例来源:origin: reactor/reactor-core

@After
public void tearDown() {
  scheduler.dispose();
  Hooks.resetOnNextDropped();
  Hooks.resetOnErrorDropped();
  Hooks.resetOnOperatorError();
  Schedulers.resetOnHandleError();
  onNexts.clear();
  onErrors.clear();
  onNextDropped.clear();
  onErrorDropped.clear();
  onOperatorError.clear();
  onOperatorErrorData.clear();
  onSchedulerHandleError.clear();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void resumeDropIfMatch() {
  AtomicReference<Throwable> error = new AtomicReference<>();
  AtomicReference<Object> value = new AtomicReference<>();
  Hooks.onErrorDropped(error::set);
  Hooks.onNextDropped(value::set);
  try {
    OnNextFailureStrategy strategy = OnNextFailureStrategy.resumeDropIf(
        e -> e instanceof NullPointerException);
    String data = "foo";
    Throwable exception = new NullPointerException("foo");
    assertThat(strategy.test(exception, data)).isTrue();
    Throwable t = strategy.process(exception, data, Context.empty());
    assertThat(t).isNull();
    assertThat(error.get()).isInstanceOf(NullPointerException.class).hasMessage("foo");
    assertThat(value.get()).isEqualTo("foo");
  }
  finally {
    Hooks.resetOnErrorDropped();
    Hooks.resetOnNextDropped();
  }
}

代码示例来源:origin: reactor/reactor-core

@Test
public void resumeDrop() {
  AtomicReference<Throwable> error = new AtomicReference<>();
  AtomicReference<Object> value = new AtomicReference<>();
  Hooks.onErrorDropped(error::set);
  Hooks.onNextDropped(value::set);
  try {
    OnNextFailureStrategy strategy = OnNextFailureStrategy.resumeDrop();
    String data = "foo";
    Throwable exception = new NullPointerException("foo");
    assertThat(strategy.test(exception, data)).isTrue();
    Throwable t = strategy.process(exception, data, Context.empty());
    assertThat(t).isNull();
    assertThat(error.get()).isInstanceOf(NullPointerException.class).hasMessage("foo");
    assertThat(value.get()).isEqualTo("foo");
  }
  finally {
    Hooks.resetOnErrorDropped();
    Hooks.resetOnNextDropped();
  }
}

代码示例来源:origin: reactor/reactor-core

@Test
public void resumeDropErrorHookFails() {
  AtomicReference<Object> value = new AtomicReference<>();
  UnsupportedOperationException failure = new UnsupportedOperationException("error hook");
  Hooks.onNextDropped(value::set);
  Hooks.onErrorDropped(v -> { throw failure; });
  try {
    OnNextFailureStrategy strategy = OnNextFailureStrategy.resumeDrop();
    String data = "foo";
    Throwable exception = new NullPointerException("foo");
    Throwable t = strategy.process(exception, data, Context.empty());
    assertThat(t)
        .hasMessage("error hook")
        .hasSuppressedException(exception);
    assertThat(value.get()).isEqualTo("foo");
  }
  finally {
    Hooks.resetOnErrorDropped();
    Hooks.resetOnNextDropped();
  }
}

代码示例来源:origin: reactor/reactor-core

@Test //FIXME use Violation.NO_CLEANUP_ON_TERMINATE
public void failDoubleErrorSilent() {
  Hooks.onErrorDropped(e -> {
    assertThat(e).hasMessage("test2");
  });
  StepVerifier.create(Flux.from(s -> {
    s.onSubscribe(Operators.emptySubscription());
    s.onError(new Exception("test"));
    s.onError(new Exception("test2"));
  }).flatMap(Flux::just))
        .verifyErrorMessage("test");
  Hooks.resetOnErrorDropped();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void pollErrorModeLocalStrategy() {
  List<Object> nextDropped = new ArrayList<>();
  List<Object> errorDropped = new ArrayList<>();
  Hooks.onNextDropped(nextDropped::add);
  Hooks.onErrorDropped(errorDropped::add);
  Context c = Context.of(OnNextFailureStrategy.KEY_ON_NEXT_ERROR_STRATEGY, OnNextFailureStrategy.RESUME_DROP);
  Exception error = new IllegalStateException("boom");
  try {
    assertThat(Hooks.onNextErrorHook).as("no global hook").isNull();
    RuntimeException e = Operators.onNextPollError("foo", error, c);
    assertThat(e).isNull();
    assertThat(nextDropped).containsExactly("foo");
    assertThat(errorDropped).containsExactly(error);
  }
  finally {
    Hooks.resetOnNextDropped();
    Hooks.resetOnErrorDropped();
  }
}

代码示例来源:origin: reactor/reactor-core

final void resetHooks() {
  Hooks.resetOnErrorDropped();
  Hooks.resetOnNextDropped();
  Hooks.resetOnEachOperator();
  Hooks.resetOnOperatorError();
  Hooks.resetOnLastOperator();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void failOverflowScalarThenError() {
  AtomicBoolean set = new AtomicBoolean();
  Hooks.onErrorDropped(e -> {
    assertThat(Exceptions.isOverflow(e)).isTrue();
    set.set(true);
  });
  StepVerifier.create(Flux.from(s -> {
    s.onSubscribe(Operators.emptySubscription());
    s.onNext(1);
    Exceptions.terminate(FluxFlatMap.FlatMapMain.ERROR, (FluxFlatMap.FlatMapMain) s);
    s.onNext(2);
    ((FluxFlatMap.FlatMapMain)s).error = null;
    s.onError(new Exception("test"));
  })
             .flatMap(Flux::just, 1), 0)
        .verifyErrorMessage("test");
  Hooks.resetOnErrorDropped();
  assertThat(set.get()).isTrue();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void failOverflowWhileActiveScalarThenError() {
  AtomicBoolean set = new AtomicBoolean();
  Hooks.onErrorDropped(e -> {
    assertThat(Exceptions.isOverflow(e)).isTrue();
    set.set(true);
  });
  StepVerifier.create(Flux.from(s -> {
    s.onSubscribe(Operators.emptySubscription());
    s.onNext(1);
    Exceptions.terminate(FluxFlatMap.FlatMapMain.ERROR, (FluxFlatMap.FlatMapMain) s);
    ((FluxFlatMap.FlatMapMain)s).wip = 1; //simulate concurrent active
    s.onNext(2);
    s.onNext(3);
    ((FluxFlatMap.FlatMapMain)s).error = null;
    ((FluxFlatMap.FlatMapMain)s).drainLoop();
    s.onError(new Exception("test"));
  })
              .flatMap(Flux::just, 1), 1)
        .expectNext(1)
        .verifyErrorMessage("test");
  Hooks.resetOnErrorDropped();
  assertThat(set.get()).isTrue();
}

代码示例来源:origin: reactor/reactor-core

@Test //FIXME use Violation.NO_CLEANUP_ON_TERMINATE
public void failDoubleErrorSilent() {
  Hooks.onErrorDropped(e -> {
  });
  StepVerifier.create(Flux.zip(obj -> 0, Flux.just(1), Flux.never(), s -> {
    s.onSubscribe(Operators.emptySubscription());
    s.onError(new Exception("test"));
    s.onError(new Exception("test2"));
  }))
        .verifyErrorMessage("test");
  Hooks.resetOnErrorDropped();
}

代码示例来源:origin: reactor/reactor-core

@Test
@SuppressWarnings("unchecked")
public void failDoubleError() {
  Hooks.onErrorDropped(e -> assertThat(e).hasMessage("test2"));
  StepVerifier.create(Flux.from(s -> {
    assertTrackableBeforeOnSubscribe((InnerOperator)s);
    s.onSubscribe(Operators.emptySubscription());
    assertTrackableAfterOnSubscribe((InnerOperator)s);
    s.onError(new Exception("test"));
    assertTrackableAfterOnComplete((InnerOperator)s);
    s.onError(new Exception("test2"));
  })
              .take(2))
        .verifyErrorMessage("test");
  Hooks.resetOnErrorDropped();
}

代码示例来源:origin: reactor/reactor-core

@Test
@SuppressWarnings("unchecked")
public void failConditionalDoubleError() {
  Hooks.onErrorDropped(e -> assertThat(e).hasMessage("test2"));
  StepVerifier.create(Flux.from(s -> {
    assertTrackableBeforeOnSubscribe((InnerOperator)s);
    s.onSubscribe(Operators.emptySubscription());
    assertTrackableAfterOnSubscribe((InnerOperator)s);
    s.onError(new Exception("test"));
    assertTrackableAfterOnComplete((InnerOperator)s);
    s.onError(new Exception("test2"));
  })
              .take(2).filter(d -> true))
        .verifyErrorMessage("test");
  Hooks.resetOnErrorDropped();
}

代码示例来源:origin: reactor/reactor-core

@After
public void resetAllHooks() {
  Hooks.resetOnOperatorError();
  Hooks.resetOnNextDropped();
  Hooks.resetOnErrorDropped();
  Hooks.resetOnOperatorDebug();
  Hooks.resetOnEachOperator();
  Hooks.resetOnLastOperator();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void failDoubleTerminalPublisher() {
  DirectProcessor<Integer> d1 = DirectProcessor.create();
  Hooks.onErrorDropped(e -> {
  });
  try {
    StepVerifier.create(Flux.zip(obj -> 0, Flux.just(1), d1, s -> {
      CoreSubscriber<?> a =
          ((DirectProcessor.DirectInner) d1.inners().findFirst().get())
              .actual;
      s.onSubscribe(Operators.emptySubscription());
      s.onComplete();
      a.onError(new Exception("test"));
    }))
          .verifyComplete();
  }
  finally {
    Hooks.resetOnErrorDropped();
  }
}

代码示例来源:origin: reactor/reactor-core

@After
public void resetAllHooks() {
  Hooks.resetOnOperatorError();
  Hooks.resetOnNextDropped();
  Hooks.resetOnErrorDropped();
  Hooks.resetOnOperatorDebug();
  Hooks.resetOnEachOperator();
  Hooks.resetOnLastOperator();
}

代码示例来源:origin: reactor/reactor-core

@Test
@SuppressWarnings("unchecked")
public void failFusedDoubleError() {
  UnicastProcessor<Integer> up = UnicastProcessor.create();
  Hooks.onErrorDropped(e -> assertThat(e).hasMessage("test2"));
  StepVerifier.create(up
              .take(2))
        .consumeSubscriptionWith(s -> {
          assertTrackableBeforeOnSubscribe((InnerOperator)s);
        })
        .then(() -> {
          assertTrackableAfterOnSubscribe((InnerOperator)up.actual);
          up.actual.onError(new Exception("test"));
          assertTrackableAfterOnComplete((InnerOperator)up.actual);
          up.actual.onError(new Exception("test2"));
        })
        .verifyErrorMessage("test");
  Hooks.resetOnErrorDropped();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void errorAfterNextIsNeverTriggered() {
  TestPublisher<String> source = TestPublisher.create();
  AtomicReference<Throwable> errorDropped = new AtomicReference<>();
  Hooks.onErrorDropped(errorDropped::set);
  try {
    StepVerifier.withVirtualTime(() ->
        new MonoDelayElement<>(source.mono(), 2, TimeUnit.SECONDS, defaultSchedulerForDelay()))
          .expectSubscription()
          .then(() -> source.next("foo").error(new IllegalStateException("boom")))
          .expectNoEvent(Duration.ofSeconds(2))
          .expectNext("foo")
          .verifyComplete();
  } finally {
    Hooks.resetOnErrorDropped();
  }
  assertThat(errorDropped.get()).isNull();
}

相关文章