reactor.core.publisher.Hooks类的使用及代码示例

x33g5p2x  于2022-01-20 转载在 其他  
字(10.9k)|赞(0)|评价(0)|浏览(147)

本文整理了Java中reactor.core.publisher.Hooks类的一些代码示例,展示了Hooks类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Hooks类的具体详情如下:
包路径:reactor.core.publisher.Hooks
类名称:Hooks

Hooks介绍

[英]A set of overridable lifecycle hooks that can be used for cross-cutting added behavior on Flux/ Mono operators.
[中]一组可重写的生命周期挂钩,可用于在Flux/Mono操作符上添加横切行为。

代码示例

代码示例来源:origin: reactor/reactor-core

@Test
public void onLastOperatorReset() {
  Hooks.onLastOperator("some", p -> p);
  assertThat(Hooks.onLastOperatorHook).isNotNull();
  assertThat(Hooks.getOnLastOperatorHooks()).hasSize(1);
  Hooks.resetOnLastOperator();
  assertThat(Hooks.onLastOperatorHook).isNull();
  assertThat(Hooks.getOnLastOperatorHooks()).isEmpty();
}

代码示例来源:origin: reactor/reactor-core

private void plugHooks() {
  Hooks.onErrorDropped(droppedErrors::offer);
  Hooks.onNextDropped(droppedElements::offer);
  Hooks.onOperatorError((t, d) -> {
    operatorErrors.offer(Tuples.of(Optional.ofNullable(t), Optional.ofNullable(d)));
    return t;
  });
}

代码示例来源:origin: reactor/reactor-core

@Test
public void resumeDropErrorHookFails() {
  AtomicReference<Object> value = new AtomicReference<>();
  UnsupportedOperationException failure = new UnsupportedOperationException("error hook");
  Hooks.onNextDropped(value::set);
  Hooks.onErrorDropped(v -> { throw failure; });
  try {
    OnNextFailureStrategy strategy = OnNextFailureStrategy.resumeDrop();
    String data = "foo";
    Throwable exception = new NullPointerException("foo");
    Throwable t = strategy.process(exception, data, Context.empty());
    assertThat(t)
        .hasMessage("error hook")
        .hasSuppressedException(exception);
    assertThat(value.get()).isEqualTo("foo");
  }
  finally {
    Hooks.resetOnErrorDropped();
    Hooks.resetOnNextDropped();
  }
}

代码示例来源:origin: reactor/reactor-core

public void unplugHooks() {
  Hooks.resetOnNextDropped();
  Hooks.resetOnErrorDropped();
  Hooks.resetOnOperatorError();
}

代码示例来源:origin: reactor/reactor-core

final void resetHooks() {
  Hooks.resetOnErrorDropped();
  Hooks.resetOnNextDropped();
  Hooks.resetOnEachOperator();
  Hooks.resetOnOperatorError();
  Hooks.resetOnLastOperator();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void accumulatingHooks() throws Exception {
  AtomicReference<String> ref = new AtomicReference<>();
  Hooks.onNextDropped(d -> {
    ref.set(d.toString());
  });
  Hooks.onNextDropped(d -> {
    ref.set(ref.get()+"bar");
  });
  Hooks.onErrorDropped(d -> {
    ref.set(d.getMessage());
  });
  Hooks.onErrorDropped(d -> {
    ref.set(ref.get()+"bar");
  });
  Hooks.resetOnErrorDropped();
  Hooks.onOperatorError((error, d) -> {
    ref.set(d.toString());
    return new Exception("bar");
  });
  Hooks.onOperatorError((error, d) -> {
    ref.set(ref.get()+error.getMessage());
    return error;
  Hooks.resetOnOperatorError();
  Hooks.onEachOperator(h -> {

代码示例来源:origin: reactor/reactor-core

@Test
public void onOperatorError() {
  AtomicReference<Object> errorValue = new AtomicReference<Object>();
  Hooks.onOperatorError((error, d) -> {
    errorValue.set(d);
    return error;
  });
  Flux<Integer> f1 = Mono.just(1).flatMapMany(i -> Flux.error(new Exception("test")));
  StepVerifier.create(f1).verifyErrorMessage("test");
  assertThat(errorValue.get()).isEqualTo(1);
  Flux<Integer> f2 = Mono.just(2).flatMapMany(i -> {
    throw new RuntimeException("test");
  });
  StepVerifier.create(f2).verifyErrorMessage("test");
  assertThat(errorValue.get()).isEqualTo(2);
  Flux<Integer> f3 = Flux.just(3, 6, 9).flatMap(i -> Flux.error(new Exception("test")));
  StepVerifier.create(f3).verifyErrorMessage("test");
  assertThat(errorValue.get()).isEqualTo(3);
  Flux<Integer> f4 = Flux.just(4, 8, 12).flatMap(i -> {
    throw new RuntimeException("test");
  });
  StepVerifier.create(f4).verifyErrorMessage("test");
  assertThat(errorValue.get()).isEqualTo(4);
  Hooks.resetOnOperatorError();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void completeHookErrorDropped() {
  Hooks.onErrorDropped(e -> assertTrue(e.getMessage().equals("complete")));
  try {
    Mono.just("foo")
      .subscribe(v -> {},
          e -> {},
          () -> { throw new IllegalStateException("complete");});
  }
  finally {
    Hooks.resetOnErrorDropped();
  }
}

代码示例来源:origin: reactor/reactor-core

@Test
public void testOnLastPublisher() throws Exception {
  List<Publisher> l = new ArrayList<>();
  Hooks.onLastOperator(p -> {
    System.out.println(Scannable.from(p).parents().count());
    System.out.println(Scannable.from(p).stepName());
    l.add(p);
    return p;
  });
  StepVerifier.create(Flux.just(1, 2, 3)
              .map(m -> m)
              .takeUntilOther(Mono.never())
              .flatMap(d -> Mono.just(d).hide()))
        .expectNext(1, 2, 3)
        .verifyComplete();
  Hooks.resetOnLastOperator();
  assertThat(l).hasSize(5);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void testTrace() throws Exception {
  Hooks.onOperatorDebug();
  try {
    Mono.fromCallable(() -> {
      throw new RuntimeException();
    })
      .map(d -> d)
      .block();
  }
  catch(Exception e){
    e.printStackTrace();
    Assert.assertTrue(e.getSuppressed()[0].getMessage().contains("MonoCallable"));
    return;
  }
  finally {
    Hooks.resetOnOperatorDebug();
  }
  throw new IllegalStateException();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void onOperatorErrorReset() {
  Hooks.onOperatorError("some", (t, v) -> t);
  assertThat(Hooks.onOperatorErrorHook).isNotNull();
  assertThat(Hooks.getOnOperatorErrorHooks()).hasSize(1);
  Hooks.resetOnOperatorError();
  assertThat(Hooks.onOperatorErrorHook).isNull();
  assertThat(Hooks.getOnOperatorErrorHooks()).isEmpty();
}

代码示例来源:origin: reactor/reactor-core

/**
 * Add a {@link Publisher} operator interceptor for the last operator created
 * in every flow ({@link Flux} or {@link Mono}). The passed function is applied
 * to the original operator {@link Publisher} and can return a different {@link Publisher},
 * on the condition that it generically maintains the same data type as the original.
 * <p>
 * Note that sub-hooks are cumulative, but invoking this method twice with the same
 * instance (or any instance that has the same `toString`) will result in only a single
 * instance being applied. See {@link #onLastOperator(String, Function)} for a variant
 * that allows you to name the sub-hooks (and thus replace them or remove them individually
 * later on). Can be fully reset via {@link #resetOnLastOperator()}.
 * <p>
 * This pointcut function cannot make use of {@link Flux}, {@link Mono} or
 * {@link ParallelFlux} APIs as it would lead to a recursive call to the hook: the
 * operator calls would effectively invoke onEachOperator from onEachOperator.
 *
 * @param onLastOperator the sub-hook: a function to intercept last operation call
 * (e.g. {@code map(fn2)} in {@code flux.map(fn).map(fn2).subscribe()})
 *
 * @see #onLastOperator(String, Function)
 * @see #resetOnLastOperator(String)
 * @see #resetOnLastOperator()
 * @see #onEachOperator(Function)
 */
public static void onLastOperator(Function<? super Publisher<Object>, ? extends Publisher<Object>> onLastOperator) {
  onLastOperator(onLastOperator.toString(), onLastOperator);
}

代码示例来源:origin: reactor/reactor-core

@SuppressWarnings("unchecked")
void assertInnerSubscriber(FluxZip.ZipSingleCoordinator c) {
  FluxZip.ZipSingleSubscriber s = (FluxZip.ZipSingleSubscriber) c.inners()
                                  .findFirst()
                                  .get();
  assertThat(s.scan(Scannable.Attr.TERMINATED)).isTrue();
  assertThat(s.scan(Scannable.Attr.BUFFERED)).isEqualTo(1);
  assertThat(s.scan(Scannable.Attr.CANCELLED)).isTrue();
  Hooks.onNextDropped(v -> {
  });
  s.onNext(0);
  Hooks.resetOnNextDropped();
}

代码示例来源:origin: reactor/reactor-core

/**
 * Add a custom error mapping, overriding the default one. Custom mapping can be an
 * accumulation of several sub-hooks each subsequently added via this method.
 * <p>
 * Note that sub-hooks are cumulative, but invoking this method twice with the same
 * instance (or any instance that has the same `toString`) will result in only a single
 * instance being applied. See {@link #onOperatorError(String, BiFunction)} for a variant
 * that allows you to name the sub-hooks (and thus replace them or remove them individually
 * later on). Can be fully reset via {@link #resetOnOperatorError()}.
 * <p>
 * For reference, the default mapping is to unwrap the exception and, if the second
 * parameter is another exception, to add it to the first as suppressed.
 *
 * @param onOperatorError an operator error {@link BiFunction} mapper, returning an arbitrary exception
 * given the failure and optionally some original context (data or error).
 *
 * @see #onOperatorError(String, BiFunction)
 * @see #resetOnOperatorError(String)
 * @see #resetOnOperatorError()
 */
public static void onOperatorError(BiFunction<? super Throwable, Object, ? extends Throwable> onOperatorError) {
  onOperatorError(onOperatorError.toString(), onOperatorError);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void verboseExtension() {
  Queue<String> q = new LinkedTransferQueue<>();
  Hooks.onEachOperator(p -> {
    q.offer(p.toString());
    return p;
  });
  Hooks.onOperatorDebug();
  Hooks.resetOnEachOperator();
  Hooks.onEachOperator(p -> {
    q.offer(p.toString());
    return p;
  Hooks.resetOnEachOperator();

代码示例来源:origin: reactor/reactor-core

@Before
public void populateDebug() {
  if (testName.getMethodName().equals("debuggingCommonStacktrace")) {
    toDebug = scatterAndGather(urls());
  }
  else if (testName.getMethodName().startsWith("debuggingActivated")) {
    Hooks.onOperatorDebug();
    toDebug = scatterAndGather(urls());
  }
}

代码示例来源:origin: reactor/reactor-core

};
Hooks.onOperatorError("1", hook1);
Hooks.onOperatorError("2", hook2);
Hooks.onOperatorErrorHook.apply(new IllegalStateException("boom"), "foo");
assertThat(Hooks.getOnOperatorErrorHooks())
    .containsOnlyKeys("1", "2");
assertThat(Hooks.getOnOperatorErrorHooks().values())
    .containsExactly(hook1, hook2);
assertThat(applied).containsExactly("h1", "h2");
Hooks.onOperatorError("1", hook3);
Hooks.onOperatorErrorHook.apply(new IllegalStateException("boom2"), "bar");
assertThat(Hooks.getOnOperatorErrorHooks())
    .containsOnlyKeys("1", "2");
assertThat(Hooks.getOnOperatorErrorHooks().values())
    .containsExactly(hook3, hook2);
assertThat(applied).containsExactly("h3", "h2");

代码示例来源:origin: reactor/reactor-core

};
Hooks.onLastOperator("1", hook1);
Hooks.onLastOperator("2", hook2);
Hooks.onLastOperatorHook.apply(s -> {});
assertThat(Hooks.getOnLastOperatorHooks())
    .containsOnlyKeys("1", "2");
assertThat(Hooks.getOnLastOperatorHooks().values())
    .containsExactly(hook1, hook2);
assertThat(applied).containsExactly("h1", "h2");
Hooks.onLastOperator("1", hook3);
Hooks.onLastOperatorHook.apply(s -> {});
assertThat(Hooks.getOnLastOperatorHooks())
    .containsOnlyKeys("1", "2");
assertThat(Hooks.getOnLastOperatorHooks().values())
    .containsExactly(hook3, hook2);
assertThat(applied).containsExactly("h3", "h2");

代码示例来源:origin: reactor/reactor-core

@Test
public void parallelModeFused() {
  Hooks.onOperatorDebug();
  Hooks.onEachOperator(p -> {
    System.out.println(Scannable.from(p).stepName());
    return p;

代码示例来源:origin: reactor/reactor-core

@Test
public void onEachOperatorReset() {
  Hooks.onEachOperator("some", p -> p);
  assertThat(Hooks.onEachOperatorHook).isNotNull();
  assertThat(Hooks.getOnEachOperatorHooks()).hasSize(1);
  Hooks.resetOnEachOperator();
  assertThat(Hooks.onEachOperatorHook).isNull();
  assertThat(Hooks.getOnEachOperatorHooks()).isEmpty();
}

相关文章