reactor.core.publisher.Flux.subscribe()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(11.2k)|赞(0)|评价(0)|浏览(2184)

本文整理了Java中reactor.core.publisher.Flux.subscribe()方法的一些代码示例,展示了Flux.subscribe()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.subscribe()方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:subscribe

Flux.subscribe介绍

[英]Subscribe to this Flux and request unbounded demand.

This version doesn't specify any consumption behavior for the events from the chain, especially no error handling, so other variants should usually be preferred.
[中]订阅此流量并请求无限需求。
此版本没有为链中的事件指定任何消费行为,特别是没有错误处理,因此通常应首选其他变体。

代码示例

代码示例来源:origin: codecentric/spring-boot-admin

@Override
  public void subscribe(Subscriber<? super InstanceEvent> s) {
    publishedFlux.subscribe(s);
  }
}

代码示例来源:origin: codecentric/spring-boot-admin

@Override
public void start() {
  super.start();
  intervalSubscription = Flux.interval(updateInterval)
                .doOnSubscribe(s -> log.debug("Scheduled status update every {}", updateInterval))
                .log(log.getName(), Level.FINEST)
                .subscribeOn(Schedulers.newSingle("status-monitor"))
                .concatMap(i -> this.updateStatusForAllInstances())
                .onErrorContinue((ex, value) -> log.warn("Unexpected error while updating statuses",
                  ex
                ))
                .subscribe();
}

代码示例来源:origin: spring-projects/spring-framework

@Override
public void subscribe(CoreSubscriber<? super Void> actual) {
  this.source.subscribe(new WriteBarrier(actual));
}

代码示例来源:origin: spring-projects/spring-framework

private void releaseBody() {
  this.body.subscribe(DataBufferUtils.releaseConsumer());
}

代码示例来源:origin: spring-projects/spring-framework

private void releaseBody() {
  this.body.subscribe(DataBufferUtils.releaseConsumer());
}

代码示例来源:origin: org.springframework/spring-web

@Override
public void subscribe(CoreSubscriber<? super Void> actual) {
  this.source.subscribe(new WriteBarrier(actual));
}

代码示例来源:origin: spring-projects/spring-framework

public Mono<byte[]> getContent() {
  return Mono.defer(() -> {
    if (this.content.isTerminated()) {
      return this.content;
    }
    if (!this.hasContentConsumer) {
      // Couple of possible cases:
      //  1. Mock server never consumed request body (e.g. error before read)
      //  2. FluxExchangeResult: getResponseBodyContent called before getResponseBody
      //noinspection ConstantConditions
      (this.publisher != null ? this.publisher : this.publisherNested)
          .onErrorMap(ex -> new IllegalStateException(
              "Content has not been consumed, and " +
                  "an error was raised while attempting to produce it.", ex))
          .subscribe();
    }
    return this.content;
  });
}

代码示例来源:origin: spring-projects/spring-framework

/**
 * Write the given stream of {@link DataBuffer DataBuffers} to the given {@code AsynchronousFileChannel}.
 * Does <strong>not</strong> close the channel when the flux is terminated, and does
 * <strong>not</strong> {@linkplain #release(DataBuffer) release} the data buffers in the
 * source. If releasing is required, then subscribe to the returned {@code Flux} with a
 * {@link #releaseConsumer()}.
 * <p>Note that the writing process does not start until the returned {@code Flux} is subscribed to.
 * @param source the stream of data buffers to be written
 * @param channel the channel to write to
 * @param position the file position at which the write is to begin; must be non-negative
 * @return a flux containing the same buffers as in {@code source}, that starts the writing
 * process when subscribed to, and that publishes any writing errors and the completion signal
 */
public static Flux<DataBuffer> write(
    Publisher<DataBuffer> source, AsynchronousFileChannel channel, long position) {
  Assert.notNull(source, "'source' must not be null");
  Assert.notNull(channel, "'channel' must not be null");
  Assert.isTrue(position >= 0, "'position' must be >= 0");
  Flux<DataBuffer> flux = Flux.from(source);
  return Flux.create(sink -> {
    AsynchronousFileChannelWriteCompletionHandler completionHandler =
        new AsynchronousFileChannelWriteCompletionHandler(sink, channel, position);
    sink.onDispose(completionHandler);
    flux.subscribe(completionHandler);
  });
}

代码示例来源:origin: spring-projects/spring-framework

/**
 * Write the given stream of {@link DataBuffer DataBuffers} to the given {@code WritableByteChannel}. Does
 * <strong>not</strong> close the channel when the flux is terminated, and does
 * <strong>not</strong> {@linkplain #release(DataBuffer) release} the data buffers in the
 * source. If releasing is required, then subscribe to the returned {@code Flux} with a
 * {@link #releaseConsumer()}.
 * <p>Note that the writing process does not start until the returned {@code Flux} is subscribed to.
 * @param source the stream of data buffers to be written
 * @param channel the channel to write to
 * @return a flux containing the same buffers as in {@code source}, that starts the writing
 * process when subscribed to, and that publishes any writing errors and the completion signal
 */
public static Flux<DataBuffer> write(Publisher<DataBuffer> source, WritableByteChannel channel) {
  Assert.notNull(source, "'source' must not be null");
  Assert.notNull(channel, "'channel' must not be null");
  Flux<DataBuffer> flux = Flux.from(source);
  return Flux.create(sink -> {
    WritableByteChannelSubscriber subscriber =
        new WritableByteChannelSubscriber(sink, channel);
    sink.onDispose(subscriber);
    flux.subscribe(subscriber);
  });
}

代码示例来源:origin: resilience4j/resilience4j

@Override
public void subscribe(CoreSubscriber<? super T> actual) {
  source.publishOn(scheduler)
      .subscribe(new RateLimiterSubscriber<>(rateLimiter, actual));
}

代码示例来源:origin: codecentric/spring-boot-admin

public void start() {
  this.subscription = Flux.interval(this.checkReminderInverval, Schedulers.newSingle("reminders"))
              .log(log.getName(), Level.FINEST)
              .doOnSubscribe(s -> log.debug("Started reminders"))
              .flatMap(i -> this.sendReminders())
              .onErrorContinue((ex, value) -> log.warn(
                "Unexpected error while sending reminders",
                ex
              ))
              .subscribe();
}

代码示例来源:origin: codecentric/spring-boot-admin

public void start() {
  subscription = Flux.from(publisher)
            .log(log.getName(), Level.FINEST)
            .doOnSubscribe(s -> log.debug("Subscribed to {} events", eventType))
            .ofType(eventType)
            .cast(eventType)
            .compose(this::handle)
            .onErrorContinue((ex, value) -> log.warn("Unexpected error while handling {}", value, ex))
            .subscribe();
}

代码示例来源:origin: codecentric/spring-boot-admin

public void start() {
  this.subscription = this.getEventStore()
              .findAll()
              .concatWith(this.getEventStore())
              .concatMap(this::updateSnapshot)
              .subscribe();
}

代码示例来源:origin: spring-projects/spring-framework

@Test // SPR-16402
public void singleSubscriberWithStrings() {
  UnicastProcessor<String> processor = UnicastProcessor.create();
  Flux.just("foo", "bar", "baz").subscribe(processor);
  MultipartBodyBuilder bodyBuilder = new MultipartBodyBuilder();
  bodyBuilder.asyncPart("name", processor, String.class);
  Mono<MultiValueMap<String, HttpEntity<?>>> result = Mono.just(bodyBuilder.build());
  Map<String, Object> hints = Collections.emptyMap();
  this.writer.write(result, null, MediaType.MULTIPART_FORM_DATA, this.response, hints).block();
}

代码示例来源:origin: spring-projects/spring-framework

@Test
public void readAndWriteAsynchronousFileChannel() throws Exception {
  Path source = Paths.get(
      DataBufferUtilsTests.class.getResource("DataBufferUtilsTests.txt").toURI());
  Flux<DataBuffer> sourceFlux = DataBufferUtils.readAsynchronousFileChannel(
      () -> AsynchronousFileChannel.open(source, StandardOpenOption.READ),
      this.bufferFactory, 3);
  Path destination = Files.createTempFile("DataBufferUtilsTests", null);
  AsynchronousFileChannel channel =
      AsynchronousFileChannel.open(destination, StandardOpenOption.WRITE);
  CountDownLatch latch = new CountDownLatch(1);
  DataBufferUtils.write(sourceFlux, channel)
      .subscribe(DataBufferUtils::release,
          throwable -> fail(throwable.getMessage()),
          () -> {
            try {
              String expected = String.join("", Files.readAllLines(source));
              String result = String.join("", Files.readAllLines(destination));
              assertEquals(expected, result);
              latch.countDown();
            }
            catch (IOException e) {
              fail(e.getMessage());
            }
            finally {
              DataBufferUtils.closeChannel(channel);
            }
          });
  latch.await();
}

代码示例来源:origin: spring-projects/spring-framework

@Test
public void readAndWriteByteChannel() throws Exception {
  Path source = Paths.get(
      DataBufferUtilsTests.class.getResource("DataBufferUtilsTests.txt").toURI());
  Flux<DataBuffer> sourceFlux =
      DataBufferUtils
          .readByteChannel(() -> FileChannel.open(source, StandardOpenOption.READ),
              this.bufferFactory, 3);
  Path destination = Files.createTempFile("DataBufferUtilsTests", null);
  WritableByteChannel channel = Files.newByteChannel(destination, StandardOpenOption.WRITE);
  DataBufferUtils.write(sourceFlux, channel)
      .subscribe(DataBufferUtils.releaseConsumer(),
          throwable -> fail(throwable.getMessage()),
          () -> {
            try {
              String expected = String.join("", Files.readAllLines(source));
              String result = String.join("", Files.readAllLines(destination));
              assertEquals(expected, result);
            }
            catch (IOException e) {
              fail(e.getMessage());
            }
            finally {
              DataBufferUtils.closeChannel(channel);
            }
          });
}

代码示例来源:origin: spring-projects/spring-framework

@Override
public ListenableFuture<Void> connect(TcpConnectionHandler<P> handler, ReconnectStrategy strategy) {
  Assert.notNull(handler, "TcpConnectionHandler is required");
  Assert.notNull(strategy, "ReconnectStrategy is required");
  if (this.stopping) {
    return handleShuttingDownConnectFailure(handler);
  }
  // Report first connect to the ListenableFuture
  MonoProcessor<Void> connectMono = MonoProcessor.create();
  this.tcpClient
      .handle(new ReactorNettyHandler(handler))
      .connect()
      .doOnNext(updateConnectMono(connectMono))
      .doOnError(updateConnectMono(connectMono))
      .doOnError(handler::afterConnectFailure)    // report all connect failures to the handler
      .flatMap(Connection::onDispose)             // post-connect issues
      .retryWhen(reconnectFunction(strategy))
      .repeatWhen(reconnectFunction(strategy))
      .subscribe();
  return new MonoToListenableFutureAdapter<>(connectMono);
}

代码示例来源:origin: spring-projects/spring-framework

@Test
public void writeWritableByteChannelCancel() throws Exception {
  DataBuffer foo = stringBuffer("foo");
  DataBuffer bar = stringBuffer("bar");
  Flux<DataBuffer> flux = Flux.just(foo, bar);
  WritableByteChannel channel = Files.newByteChannel(tempFile, StandardOpenOption.WRITE);
  Flux<DataBuffer> writeResult = DataBufferUtils.write(flux, channel);
  StepVerifier.create(writeResult, 1)
      .consumeNextWith(stringConsumer("foo"))
      .thenCancel()
      .verify(Duration.ofSeconds(5));
  String result = String.join("", Files.readAllLines(tempFile));
  assertEquals("foo", result);
  channel.close();
  flux.subscribe(DataBufferUtils::release);
}

代码示例来源:origin: spring-projects/spring-framework

@Test
public void writeAsynchronousFileChannelCanceled() throws Exception {
  DataBuffer foo = stringBuffer("foo");
  DataBuffer bar = stringBuffer("bar");
  Flux<DataBuffer> flux = Flux.just(foo, bar);
  AsynchronousFileChannel channel =
      AsynchronousFileChannel.open(tempFile, StandardOpenOption.WRITE);
  Flux<DataBuffer> writeResult = DataBufferUtils.write(flux, channel);
  StepVerifier.create(writeResult, 1)
      .consumeNextWith(stringConsumer("foo"))
      .thenCancel()
      .verify();
  String result = String.join("", Files.readAllLines(tempFile));
  assertEquals("foo", result);
  channel.close();
  flux.subscribe(DataBufferUtils::release);
}

代码示例来源:origin: spring-projects/spring-framework

@Test
public void releaseConsumer() {
  DataBuffer foo = stringBuffer("foo");
  DataBuffer bar = stringBuffer("bar");
  DataBuffer baz = stringBuffer("baz");
  Flux<DataBuffer> flux = Flux.just(foo, bar, baz);
  flux.subscribe(DataBufferUtils.releaseConsumer());
  assertReleased(foo);
  assertReleased(bar);
  assertReleased(baz);
}

相关文章

微信公众号

最新文章

更多

Flux类方法