reactor.core.publisher.Mono.fromDirect()方法的使用及代码示例

x33g5p2x  于2022-01-24 转载在 其他  
字(11.9k)|赞(0)|评价(0)|浏览(422)

本文整理了Java中reactor.core.publisher.Mono.fromDirect()方法的一些代码示例,展示了Mono.fromDirect()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Mono.fromDirect()方法的具体详情如下:
包路径:reactor.core.publisher.Mono
类名称:Mono
方法名:fromDirect

Mono.fromDirect介绍

[英]Convert a Publisher to a Mono without any cardinality check (ie this method doesn't check if the source is already a Mono, nor cancels the source past the first element). Conversion supports Fuseable sources. Note this is an advanced interoperability operator that implies you know the Publisher you are converting follows the Mono semantics and only ever emits one element.
[中]无需任何基数检查即可将发布服务器转换为Mono(即此方法不会检查源是否已经是Mono,也不会取消超过第一个元素的源)。转换支持可融合源。注意,这是一个高级互操作性操作符,意味着您知道要转换的发布服务器遵循Mono语义,并且只发出一个元素。

代码示例

代码示例来源:origin: reactor/reactor-core

@Override
public Mono<T> mono() {
  if (violations.isEmpty()) {
    return Mono.from(this);
  }
  else {
    return Mono.fromDirect(this);
  }
}

代码示例来源:origin: reactor/reactor-core

@Override
protected Mono<I> withFluxSource(Flux<I> input) {
  return Mono.fromDirect(input);
}

代码示例来源:origin: reactor/reactor-core

/**
 * Return a {@code Mono<Void>} that waits for this {@link Mono} to complete then
 * for a supplied {@link Publisher Publisher&lt;Void&gt;} to also complete. The
 * second completion signal is replayed, or any error signal that occurs instead.
 * <p>
 * <img class="marble" src="doc-files/marbles/thenEmptyForMono.svg" alt="">
 *
 * @reactor.discard This operator discards the element from the source.
 *
 * @param other a {@link Publisher} to wait for after this Mono's termination
 * @return a new {@link Mono} completing when both publishers have completed in
 * sequence
 */
public final Mono<Void> thenEmpty(Publisher<Void> other) {
  return then(fromDirect(other));
}

代码示例来源:origin: reactor/reactor-core

/**
 * Return a {@code Mono<Void>} that waits for this {@link Flux} to complete then
 * for a supplied {@link Publisher Publisher&lt;Void&gt;} to also complete. The
 * second completion signal is replayed, or any error signal that occurs instead.
 * <p>
 * <img class="marble" src="doc-files/marbles/thenEmptyForFlux.svg" alt="">
 *
 * @reactor.discard This operator discards elements from the source.
 *
 * @param other a {@link Publisher} to wait for after this Flux's termination
 * @return a new {@link Mono} completing when both publishers have completed in
 * sequence
 */
public final Mono<Void> thenEmpty(Publisher<Void> other) {
  return then(Mono.fromDirect(other));
}

代码示例来源:origin: reactor/reactor-core

@Test
public void completeOnNextWithoutCancel() {
  AtomicInteger onCancel = new AtomicInteger();
  AtomicInteger sourceOnCancel = new AtomicInteger();
  AtomicInteger onTerminate = new AtomicInteger();
  AtomicInteger sourceOnTerminate = new AtomicInteger();
  Mono<String> source = Mono.<String>fromDirect(s -> {
    s.onSubscribe(Operators.emptySubscription());
    s.onNext("foo");
  })
  .doOnCancel(sourceOnCancel::incrementAndGet)
  .doOnSuccessOrError((v, e) -> sourceOnTerminate.incrementAndGet());
  StepVerifier.withVirtualTime(() -> new MonoDelayElement<>(source,
      2,
      TimeUnit.SECONDS,
      defaultSchedulerForDelay())
      .doOnCancel(onCancel::incrementAndGet)
      .doOnSuccessOrError((v, e) -> onTerminate.incrementAndGet()))
        .expectSubscription()
        .expectNoEvent(Duration.ofSeconds(2))
        .expectNext("foo")
        .verifyComplete();
  assertThat(onTerminate.get()).isEqualTo(1);
  assertThat(sourceOnTerminate.get()).isEqualTo(1);
  assertThat(onCancel.get()).isEqualTo(0);
  assertThat(sourceOnCancel.get()).isEqualTo(0);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void scanSubscriber() {
  Flux<String> source = Flux.just("foo").map(i -> i);
  Mono<String> test = Mono.fromDirect(source);
  assertThat(Scannable.from(test).scan(Scannable.Attr.PARENT)).isSameAs(source);
  assertThat(Scannable.from(test).scan(Scannable.Attr.ACTUAL)).isNull();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void testCallbacksFusionSync() {
  AtomicReference<Integer> successInvocation = new AtomicReference<>();
  AtomicReference<Integer> onTerminateInvocation = new AtomicReference<>();
  AtomicReference<Integer> afterTerminateInvocation = new AtomicReference<>();
  AtomicReference<Throwable> error = new AtomicReference<>();
  Mono<Integer> source = Mono.fromDirect(Flux.range(55, 1));
  Mono<Integer> mono = new MonoPeekTerminal<>(source,
      successInvocation::set,
      (v, t) -> {
        onTerminateInvocation.set(v);
        error.set(t);
      },
      (v, t) -> {
        afterTerminateInvocation.set(v);
        error.set(t);
      });
  StepVerifier.create(mono)
        .expectFusion(Fuseable.SYNC, Fuseable.SYNC) //TODO in 3.0.3 this doesn't work
        .expectNext(55)
        .expectComplete()
        .verify();
  assertEquals(55, (Object) successInvocation.get());
  assertEquals(55, (Object) onTerminateInvocation.get());
  assertEquals(55, (Object) afterTerminateInvocation.get());
  assertEquals(null, error.get());
}

代码示例来源:origin: spring-cloud/spring-cloud-gateway

public GatewayFilter apply(Repeat<ServerWebExchange> repeat, Retry<ServerWebExchange> retry) {
  return (exchange, chain) -> {
    trace("Entering retry-filter");
    // chain.filter returns a Mono<Void>
    Publisher<Void> publisher = chain.filter(exchange)
        //.log("retry-filter", Level.INFO)
        .doOnSuccessOrError((aVoid, throwable) -> {
          int iteration = exchange.getAttributeOrDefault(RETRY_ITERATION_KEY, -1);
          int newIteration = iteration + 1;
          trace("setting new iteration in attr %d", newIteration);
          exchange.getAttributes().put(RETRY_ITERATION_KEY, newIteration);
        });
    if (retry != null) {
      // retryWhen returns a Mono<Void>
      // retry needs to go before repeat
      publisher = ((Mono<Void>)publisher).retryWhen(retry.withApplicationContext(exchange));
    }
    if (repeat != null) {
      // repeatWhen returns a Flux<Void>
      // so this needs to be last and the variable a Publisher<Void>
      publisher = ((Mono<Void>)publisher).repeatWhen(repeat.withApplicationContext(exchange));
    }
    return Mono.fromDirect(publisher);
  };
}

代码示例来源:origin: reactor/reactor-core

@Test
public void scanSubscriberHide() {
  Flux<String> source = Flux.just("foo").hide();
  Mono<String> test = Mono.fromDirect(source);
  assertThat(Scannable.from(test).scan(Scannable.Attr.PARENT)).isSameAs(source);
  assertThat(Scannable.from(test).scan(Scannable.Attr.ACTUAL)).isNull();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void guardedAgainstOnError() {
  AtomicReference<Throwable> dropped = new AtomicReference<>();
  Hooks.onErrorDropped(dropped::set);
  Mono<String> source = Mono.fromDirect(s -> {
    s.onSubscribe(Operators.emptySubscription());
    s.onNext("foo");
    s.onError(new IllegalStateException("boom"));
  });
  try {
    StepVerifier.withVirtualTime(() -> new MonoDelayElement<>(source,
        2,
        TimeUnit.SECONDS,
        defaultSchedulerForDelay()))
          .expectSubscription()
          .expectNoEvent(Duration.ofSeconds(2))
          .expectNext("foo")
          .verifyComplete();
  }
  finally {
    Hooks.resetOnErrorDropped();
  }
  assertThat(dropped.get()).hasMessage("boom")
               .isInstanceOf(IllegalStateException.class);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void guardedAgainstMultipleOnNext() {
  AtomicReference<Object> dropped = new AtomicReference<>();
  Hooks.onNextDropped(dropped::set);
  Mono<String> source = Mono.fromDirect(s -> {
    s.onSubscribe(Operators.emptySubscription());
    s.onNext("foo");
    s.onNext("bar");
    s.onComplete();
  });
  try {
    StepVerifier.withVirtualTime(() -> new MonoDelayElement<>(source,
        2,
        TimeUnit.SECONDS,
        defaultSchedulerForDelay()))
          .expectSubscription()
          .expectNoEvent(Duration.ofSeconds(2))
          .expectNext("foo")
          .verifyComplete();
  }
  finally {
    Hooks.resetOnNextDropped();
  }
  assertThat(dropped.get()).isEqualTo("bar");
}

代码示例来源:origin: reactor/reactor-core

@Test
public void guardedAgainstOnComplete() {
  Mono<String> source = Mono.fromDirect(s -> {
    s.onSubscribe(Operators.emptySubscription());
    s.onNext("foo");
    s.onComplete();
  });
  StepVerifier.withVirtualTime(() -> new MonoDelayElement<>(source,
      2,
      TimeUnit.SECONDS,
      defaultSchedulerForDelay()))
        .expectSubscription()
        .expectNoEvent(Duration.ofSeconds(2))
        .expectNext("foo")
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void errorMonoState(){
  Hooks.onErrorDropped(e -> assertTrue(e.getMessage().equals("test2")));
  Hooks.onNextDropped(d -> assertTrue(d.equals("test2")));
  try {
    Mono.fromDirect(s -> {
      assertTrue(s instanceof LambdaMonoSubscriber);
      LambdaMonoSubscriber<?> bfs = (LambdaMonoSubscriber<?>) s;
      Operators.error(s, new Exception("test"));
      s.onComplete();
      s.onError(new Exception("test2"));
      s.onNext("test2");
      assertTrue(bfs.scan(Scannable.Attr.TERMINATED));
      bfs.dispose();
    })
         .subscribe(s -> {
         }, e -> {
         }, () -> {
         });
  }
  finally {
    Hooks.resetOnErrorDropped();
    Hooks.resetOnNextDropped();
  }
}

代码示例来源:origin: reactor/reactor-core

@Test
public void emptyMonoState(){
  assertTrue(Mono.fromDirect(s -> {
    assertTrue(s instanceof LambdaMonoSubscriber);
    LambdaMonoSubscriber<?> bfs = (LambdaMonoSubscriber<?>)s;
    assertTrue(bfs.scan(Scannable.Attr.PREFETCH) == Integer.MAX_VALUE);
    assertFalse(bfs.scan(Scannable.Attr.TERMINATED));
    bfs.onSubscribe(Operators.emptySubscription());
    bfs.onSubscribe(Operators.emptySubscription()); // noop
    s.onComplete();
    assertTrue(bfs.scan(Scannable.Attr.TERMINATED));
    bfs.dispose();
    bfs.dispose();
  }).subscribe(s -> {}, null, () -> {}).isDisposed());
  assertFalse(Mono.never().subscribe(null, null, () -> {}).isDisposed());
}

代码示例来源:origin: io.projectreactor.ipc/reactor-netty

@Override
public final Mono<Void> onClose() {
  return Mono.fromDirect(onInactive);
}

代码示例来源:origin: io.projectreactor/reactor-test

@Override
public Mono<T> mono() {
  if (violations.isEmpty()) {
    return Mono.from(this);
  }
  else {
    return Mono.fromDirect(this);
  }
}

代码示例来源:origin: io.projectreactor/reactor-core

/**
 * Return a {@code Mono<Void>} that waits for this {@link Flux} to complete then
 * for a supplied {@link Publisher Publisher&lt;Void&gt;} to also complete. The
 * second completion signal is replayed, or any error signal that occurs instead.
 * <p>
 * <img class="marble" src="doc-files/marbles/thenEmptyForFlux.svg" alt="">
 *
 * @reactor.discard This operator discards elements from the source.
 *
 * @param other a {@link Publisher} to wait for after this Flux's termination
 * @return a new {@link Mono} completing when both publishers have completed in
 * sequence
 */
public final Mono<Void> thenEmpty(Publisher<Void> other) {
  return then(Mono.fromDirect(other));
}

代码示例来源:origin: io.projectreactor/reactor-core

/**
 * Return a {@code Mono<Void>} that waits for this {@link Mono} to complete then
 * for a supplied {@link Publisher Publisher&lt;Void&gt;} to also complete. The
 * second completion signal is replayed, or any error signal that occurs instead.
 * <p>
 * <img class="marble" src="doc-files/marbles/thenEmptyForMono.svg" alt="">
 *
 * @reactor.discard This operator discards the element from the source.
 *
 * @param other a {@link Publisher} to wait for after this Mono's termination
 * @return a new {@link Mono} completing when both publishers have completed in
 * sequence
 */
public final Mono<Void> thenEmpty(Publisher<Void> other) {
  return then(fromDirect(other));
}

代码示例来源:origin: reactor/reactor-netty

/**
 * Attach an IO handler to react on connected client
 *
 * @param handler an IO handler that can dispose underlying connection when {@link
 * Publisher} terminates.
 *
 * @return a new {@link UdpServer}
 */
public final UdpServer handle(BiFunction<? super UdpInbound, ? super UdpOutbound, ? extends Publisher<Void>> handler) {
  Objects.requireNonNull(handler, "handler");
  return doOnBound(c -> {
    if (log.isDebugEnabled()) {
      log.debug(format(c.channel(), "Handler is being applied: {}"), handler);
    }
    Mono.fromDirect(handler.apply((UdpInbound) c, (UdpOutbound) c))
      .subscribe(c.disposeSubscriber());
  });
}

代码示例来源:origin: reactor/reactor-netty

/**
 * Attach an IO handler to react on connected client
 *
 * @param handler an IO handler that can dispose underlying connection when {@link
 * Publisher} terminates.
 *
 * @return a new {@link TcpClient}
 */
public final TcpClient handle(BiFunction<? super NettyInbound, ? super NettyOutbound, ? extends Publisher<Void>> handler) {
  Objects.requireNonNull(handler, "handler");
  return doOnConnected(c -> {
    if (log.isDebugEnabled()) {
      log.debug(format(c.channel(), "Handler is being applied: {}"), handler);
    }
    Mono.fromDirect(handler.apply((NettyInbound) c, (NettyOutbound) c))
      .subscribe(c.disposeSubscriber());
  });
}

相关文章

微信公众号

最新文章

更多

Mono类方法