reactor.core.publisher.Hooks.onLastOperator()方法的使用及代码示例

x33g5p2x  于2022-01-20 转载在 其他  
字(10.5k)|赞(0)|评价(0)|浏览(133)

本文整理了Java中reactor.core.publisher.Hooks.onLastOperator()方法的一些代码示例,展示了Hooks.onLastOperator()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Hooks.onLastOperator()方法的具体详情如下:
包路径:reactor.core.publisher.Hooks
类名称:Hooks
方法名:onLastOperator

Hooks.onLastOperator介绍

[英]Add or replace a named Publisher operator interceptor for the last operator created in every flow ( Flux or Mono). The passed function is applied to the original operator Publisher and can return a different Publisher, on the condition that it generically maintains the same data type as the original. Use of the Flux/ Mono APIs is discouraged as it will recursively call this hook, leading to StackOverflowError.

Note that sub-hooks are cumulative. Invoking this method twice with the same key will replace the old sub-hook with that name, but keep the execution order (eg. A-h1, B-h2, A-h3 will keep A-B execution order, leading to hooks h3 then h2 being executed). Removing a particular key using #resetOnLastOperator(String) then adding it back will result in the execution order changing (the later sub-hook being executed last). Can be fully reset via #resetOnLastOperator().

This pointcut function cannot make use of Flux, Mono or ParallelFlux APIs as it would lead to a recursive call to the hook: the operator calls would effectively invoke onEachOperator from onEachOperator.
[中]为每个流(Flux或Mono)中创建的最后一个操作符添加或替换命名发布者操作符拦截器。传递的函数应用于原始运算符发布服务器,并且可以返回不同的发布服务器,条件是它通常保持与原始服务器相同的数据类型。不鼓励使用Flux/MonoAPI,因为它会递归调用此钩子,从而导致StackOverflowerError。
请注意,子挂钩是累积的。使用同一个键调用此方法两次将用该名称替换旧的子钩子,但保留执行顺序(例如,A-h1、B-h2、A-h3将保留A-B执行顺序,导致钩子h3然后执行h2)。使用#resetonlastomoperator(String)删除一个特定的键,然后将其添加回去,将导致执行顺序发生变化(最后执行的是后面的子钩子)。可通过#ResetOnlaOperator()完全重置。
这个切入点函数不能使用Flux、Mono或ParallelFlux API,因为它会导致对钩子的递归调用:操作符调用将有效地从onEachOperator调用onEachOperator。

代码示例

代码示例来源:origin: reactor/reactor-core

/**
 * Add a {@link Publisher} operator interceptor for the last operator created
 * in every flow ({@link Flux} or {@link Mono}). The passed function is applied
 * to the original operator {@link Publisher} and can return a different {@link Publisher},
 * on the condition that it generically maintains the same data type as the original.
 * <p>
 * Note that sub-hooks are cumulative, but invoking this method twice with the same
 * instance (or any instance that has the same `toString`) will result in only a single
 * instance being applied. See {@link #onLastOperator(String, Function)} for a variant
 * that allows you to name the sub-hooks (and thus replace them or remove them individually
 * later on). Can be fully reset via {@link #resetOnLastOperator()}.
 * <p>
 * This pointcut function cannot make use of {@link Flux}, {@link Mono} or
 * {@link ParallelFlux} APIs as it would lead to a recursive call to the hook: the
 * operator calls would effectively invoke onEachOperator from onEachOperator.
 *
 * @param onLastOperator the sub-hook: a function to intercept last operation call
 * (e.g. {@code map(fn2)} in {@code flux.map(fn).map(fn2).subscribe()})
 *
 * @see #onLastOperator(String, Function)
 * @see #resetOnLastOperator(String)
 * @see #resetOnLastOperator()
 * @see #onEachOperator(Function)
 */
public static void onLastOperator(Function<? super Publisher<Object>, ? extends Publisher<Object>> onLastOperator) {
  onLastOperator(onLastOperator.toString(), onLastOperator);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void onLastOperatorSameLambdaSameNameAppliedOnce() {
  AtomicInteger applied = new AtomicInteger();
  Function<? super Publisher<Object>, ? extends Publisher<Object>> hook = p -> {
    applied.incrementAndGet();
    return p;
  };
  Hooks.onLastOperator(hook);
  Hooks.onLastOperator(hook);
  Hooks.onLastOperatorHook.apply(s -> {});
  assertThat(applied.get()).isEqualTo(1);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void onLastOperatorSameLambdaDifferentNamesAppliedTwice() {
  AtomicInteger applied = new AtomicInteger();
  Function<? super Publisher<Object>, ? extends Publisher<Object>> hook = p -> {
    applied.incrementAndGet();
    return p;
  };
  Hooks.onLastOperator(hook);
  Hooks.onLastOperator("other", hook);
  Hooks.onLastOperatorHook.apply(s -> {});
  assertThat(applied.get()).isEqualTo(2);
}

代码示例来源:origin: spring-cloud/spring-cloud-sleuth

void setupHooks(BeanFactory beanFactory) {
  Hooks.onLastOperator(
      TraceReactorAutoConfiguration.TraceReactorConfiguration.SLEUTH_TRACE_REACTOR_KEY,
      ReactorSleuth.scopePassingSpanOperator(this.context));
  Schedulers.setFactory(factoryInstance(beanFactory));
}

代码示例来源:origin: reactor/reactor-core

@Test
public void onLastOperatorOneHookNoComposite() {
  Function<? super Publisher<Object>, ? extends Publisher<Object>> hook = p -> p;
  Hooks.onLastOperator(hook);
  assertThat(Hooks.onLastOperatorHook).isSameAs(hook);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void onLastOperatorResetSpecific() {
  List<String> applied = new ArrayList<>(3);
  Function<? super Publisher<Object>, ? extends Publisher<Object>> hook1 = p -> {
    applied.add("h1");
    return p;
  };
  Function<? super Publisher<Object>, ? extends Publisher<Object>> hook2 = p -> {
    applied.add("h2");
    return p;
  };
  Hooks.onLastOperator("1", hook1);
  Hooks.onLastOperator(hook2);
  Hooks.onLastOperatorHook.apply(s -> {});
  assertThat(Hooks.getOnLastOperatorHooks()).hasSize(2);
  assertThat(applied).containsExactly("h1", "h2");
  applied.clear();
  Hooks.resetOnLastOperator("1");
  Hooks.onLastOperatorHook.apply(s -> {});
  assertThat(Hooks.getOnLastOperatorHooks()).hasSize(1);
  assertThat(applied).containsExactly("h2");
}

代码示例来源:origin: reactor/reactor-core

};
Hooks.onLastOperator("1", hook1);
Hooks.onLastOperator("2", hook2);
Hooks.onLastOperatorHook.apply(s -> {});
Hooks.onLastOperator("1", hook3);
Hooks.onLastOperatorHook.apply(s -> {});

代码示例来源:origin: reactor/reactor-core

@Test
public void onLastOperatorClearByName() {
  Hooks.onLastOperator("some", p -> p);
  Hooks.onLastOperator("other", p -> p);
  assertThat(Hooks.onLastOperatorHook).isNotNull();
  assertThat(Hooks.getOnLastOperatorHooks()).hasSize(2);
  Hooks.resetOnLastOperator("some");
  assertThat(Hooks.onLastOperatorHook).isNotNull();
  assertThat(Hooks.getOnLastOperatorHooks())
      .hasSize(1)
      .containsOnlyKeys("other");
  Hooks.resetOnLastOperator("other");
  assertThat(Hooks.onLastOperatorHook).isNull();
  assertThat(Hooks.getOnLastOperatorHooks()).isEmpty();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void onLastOperatorReset() {
  Hooks.onLastOperator("some", p -> p);
  assertThat(Hooks.onLastOperatorHook).isNotNull();
  assertThat(Hooks.getOnLastOperatorHooks()).hasSize(1);
  Hooks.resetOnLastOperator();
  assertThat(Hooks.onLastOperatorHook).isNull();
  assertThat(Hooks.getOnLastOperatorHooks()).isEmpty();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void testOnLastPublisher() throws Exception {
  List<Publisher> l = new ArrayList<>();
  Hooks.onLastOperator(p -> {
    System.out.println(Scannable.from(p).parents().count());
    System.out.println(Scannable.from(p).stepName());
    l.add(p);
    return p;
  });
  StepVerifier.create(Flux.just(1, 2, 3)
              .map(m -> m)
              .takeUntilOther(Mono.never())
              .flatMap(d -> Mono.just(d).hide()))
        .expectNext(1, 2, 3)
        .verifyComplete();
  Hooks.resetOnLastOperator();
  assertThat(l).hasSize(5);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void testOnLastPublisher() throws Exception {
  List<Publisher> l = new ArrayList<>();
  Hooks.onLastOperator(p -> {
    System.out.println(Scannable.from(p).parents().count());
    System.out.println(Scannable.from(p).stepName());
    l.add(p);
    return p;
  });
  StepVerifier.create(Flux.just(1, 2, 3)
              .map(m -> m)
              .takeUntilOther(Mono.never())
              .flatMap(d -> Mono.just(d).hide()))
        .expectNext(1, 2, 3)
        .verifyComplete();
  Hooks.resetOnLastOperator();
  assertThat(l).hasSize(5);
}

代码示例来源:origin: reactor/reactor-core

hook2.set(null);
Hooks.onLastOperator(h -> {
  final Flux<Object> publisher = TestPublisher.create().flux();
  hook.set(publisher);
  return publisher;
});
Hooks.onLastOperator(h -> {
  hook2.set(h);
  return h;

代码示例来源:origin: reactor/reactor-core

@Test
public void lastOperatorTest() {
  Hooks.onLastOperator(Operators.lift((sc, sub) ->
      new CoreSubscriber<Object>(){
        @Override

代码示例来源:origin: reactor/reactor-core

@Test
public void lastOperatorTest() {
  Hooks.onLastOperator(Operators.lift((sc, sub) ->
      new CoreSubscriber<Object>(){
        @Override

代码示例来源:origin: reactor/reactor-core

@Test
public void lastOperatorFilterTest() {
  Hooks.onLastOperator(Operators.lift(sc -> sc.tags()
                        .anyMatch(t -> t.getT1()
                                .contains("metric")),

代码示例来源:origin: reactor/reactor-core

@Test
public void lastOperatorFilterTest() {
  Hooks.onLastOperator(Operators.lift(sc -> sc.tags()
                        .anyMatch(t -> t.getT1()
                                .contains("metric")),

代码示例来源:origin: io.projectreactor/reactor-core

/**
 * Add a {@link Publisher} operator interceptor for the last operator created
 * in every flow ({@link Flux} or {@link Mono}). The passed function is applied
 * to the original operator {@link Publisher} and can return a different {@link Publisher},
 * on the condition that it generically maintains the same data type as the original.
 * <p>
 * Note that sub-hooks are cumulative, but invoking this method twice with the same
 * instance (or any instance that has the same `toString`) will result in only a single
 * instance being applied. See {@link #onLastOperator(String, Function)} for a variant
 * that allows you to name the sub-hooks (and thus replace them or remove them individually
 * later on). Can be fully reset via {@link #resetOnLastOperator()}.
 * <p>
 * This pointcut function cannot make use of {@link Flux}, {@link Mono} or
 * {@link ParallelFlux} APIs as it would lead to a recursive call to the hook: the
 * operator calls would effectively invoke onEachOperator from onEachOperator.
 *
 * @param onLastOperator the sub-hook: a function to intercept last operation call
 * (e.g. {@code map(fn2)} in {@code flux.map(fn).map(fn2).subscribe()})
 *
 * @see #onLastOperator(String, Function)
 * @see #resetOnLastOperator(String)
 * @see #resetOnLastOperator()
 * @see #onEachOperator(Function)
 */
public static void onLastOperator(Function<? super Publisher<Object>, ? extends Publisher<Object>> onLastOperator) {
  onLastOperator(onLastOperator.toString(), onLastOperator);
}

代码示例来源:origin: apache/servicemix-bundles

@Override
public void beforeTestMethod(TestContext testContext) throws Exception {
  SecurityContext securityContext = TestSecurityContextHolder.getContext();
  Hooks.onLastOperator(CONTEXT_OPERATOR_KEY, Operators.lift((s, sub) -> new SecuritySubContext<>(sub, securityContext)));
}

代码示例来源:origin: org.springframework.security/spring-security-test

@Override
public void beforeTestMethod(TestContext testContext) throws Exception {
  SecurityContext securityContext = TestSecurityContextHolder.getContext();
  Hooks.onLastOperator(CONTEXT_OPERATOR_KEY, Operators.lift((s, sub) -> new SecuritySubContext<>(sub, securityContext)));
}

代码示例来源:origin: org.springframework.cloud/spring-cloud-sleuth-core

void setupHooks(BeanFactory beanFactory) {
  Hooks.onLastOperator(
      TraceReactorAutoConfiguration.TraceReactorConfiguration.SLEUTH_TRACE_REACTOR_KEY,
      ReactorSleuth.scopePassingSpanOperator(this.context));
  Schedulers.setFactory(factoryInstance(beanFactory));
}

相关文章