reactor.core.publisher.Flux.create()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(11.9k)|赞(0)|评价(0)|浏览(300)

本文整理了Java中reactor.core.publisher.Flux.create()方法的一些代码示例,展示了Flux.create()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.create()方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:create

Flux.create介绍

[英]Programmatically create a Flux with the capability of emitting multiple elements in a synchronous or asynchronous manner through the FluxSink API. This includes emitting elements from multiple threads.

This Flux factory is useful if one wants to adapt some other multi-valued async API and not worry about cancellation and backpressure (which is handled by buffering all signals if the downstream can't keep up).

For example:

Flux.<String>create(emitter -> { 
ActionListener al = e -> { 
emitter.next(textField.getText()); 
}; 
// without cleanup support: 
button.addActionListener(al); 
// with cleanup support: 
button.addActionListener(al); 
emitter.onDispose(() -> { 
button.removeListener(al); 
}); 
});

[中]以编程方式创建具有通过FluxSink API以同步或异步方式发射多个元素能力的通量。这包括从多个线程发射元素。
如果您想要适应其他多值异步API,并且不担心取消和背压(如果下游无法跟上,则通过缓冲所有信号来处理),则此流量工厂非常有用。
例如:

Flux.<String>create(emitter -> { 
ActionListener al = e -> { 
emitter.next(textField.getText()); 
}; 
// without cleanup support: 
button.addActionListener(al); 
// with cleanup support: 
button.addActionListener(al); 
emitter.onDispose(() -> { 
button.removeListener(al); 
}); 
});

代码示例

代码示例来源:origin: spring-projects/spring-framework

/**
 * Write the given stream of {@link DataBuffer DataBuffers} to the given {@code AsynchronousFileChannel}.
 * Does <strong>not</strong> close the channel when the flux is terminated, and does
 * <strong>not</strong> {@linkplain #release(DataBuffer) release} the data buffers in the
 * source. If releasing is required, then subscribe to the returned {@code Flux} with a
 * {@link #releaseConsumer()}.
 * <p>Note that the writing process does not start until the returned {@code Flux} is subscribed to.
 * @param source the stream of data buffers to be written
 * @param channel the channel to write to
 * @param position the file position at which the write is to begin; must be non-negative
 * @return a flux containing the same buffers as in {@code source}, that starts the writing
 * process when subscribed to, and that publishes any writing errors and the completion signal
 */
public static Flux<DataBuffer> write(
    Publisher<DataBuffer> source, AsynchronousFileChannel channel, long position) {
  Assert.notNull(source, "'source' must not be null");
  Assert.notNull(channel, "'channel' must not be null");
  Assert.isTrue(position >= 0, "'position' must be >= 0");
  Flux<DataBuffer> flux = Flux.from(source);
  return Flux.create(sink -> {
    AsynchronousFileChannelWriteCompletionHandler completionHandler =
        new AsynchronousFileChannelWriteCompletionHandler(sink, channel, position);
    sink.onDispose(completionHandler);
    flux.subscribe(completionHandler);
  });
}

代码示例来源:origin: spring-projects/spring-framework

/**
 * Write the given stream of {@link DataBuffer DataBuffers} to the given {@code WritableByteChannel}. Does
 * <strong>not</strong> close the channel when the flux is terminated, and does
 * <strong>not</strong> {@linkplain #release(DataBuffer) release} the data buffers in the
 * source. If releasing is required, then subscribe to the returned {@code Flux} with a
 * {@link #releaseConsumer()}.
 * <p>Note that the writing process does not start until the returned {@code Flux} is subscribed to.
 * @param source the stream of data buffers to be written
 * @param channel the channel to write to
 * @return a flux containing the same buffers as in {@code source}, that starts the writing
 * process when subscribed to, and that publishes any writing errors and the completion signal
 */
public static Flux<DataBuffer> write(Publisher<DataBuffer> source, WritableByteChannel channel) {
  Assert.notNull(source, "'source' must not be null");
  Assert.notNull(channel, "'channel' must not be null");
  Flux<DataBuffer> flux = Flux.from(source);
  return Flux.create(sink -> {
    WritableByteChannelSubscriber subscriber =
        new WritableByteChannelSubscriber(sink, channel);
    sink.onDispose(subscriber);
    flux.subscribe(subscriber);
  });
}

代码示例来源:origin: lettuce-io/lettuce-core

private static <K, V> Flux<K> scan(RedisKeyReactiveCommands<K, V> commands, Optional<ScanArgs> scanArgs) {
  LettuceAssert.notNull(commands, "RedisKeyCommands must not be null");
  return Flux.create(sink -> {
    Mono<KeyScanCursor<K>> res = scanArgs.map(commands::scan).orElseGet(commands::scan);
    scan(sink, res, c -> scanArgs.map(it -> commands.scan(c, it)).orElseGet(() -> commands.scan(c)), //
        KeyScanCursor::getKeys);
  });
}

代码示例来源:origin: spring-projects/spring-framework

/**
 * Obtain a {@code AsynchronousFileChannel} from the given supplier, and read it into a
 * {@code Flux} of {@code DataBuffer}s, starting at the given position. Closes the
 * channel when the flux is terminated.
 * @param channelSupplier the supplier for the channel to read from
 * @param position the position to start reading from
 * @param dataBufferFactory the factory to create data buffers with
 * @param bufferSize the maximum size of the data buffers
 * @return a flux of data buffers read from the given channel
 */
public static Flux<DataBuffer> readAsynchronousFileChannel(Callable<AsynchronousFileChannel> channelSupplier,
    long position, DataBufferFactory dataBufferFactory, int bufferSize) {
  Assert.notNull(channelSupplier, "'channelSupplier' must not be null");
  Assert.notNull(dataBufferFactory, "'dataBufferFactory' must not be null");
  Assert.isTrue(position >= 0, "'position' must be >= 0");
  Assert.isTrue(bufferSize > 0, "'bufferSize' must be > 0");
  DataBuffer dataBuffer = dataBufferFactory.allocateBuffer(bufferSize);
  ByteBuffer byteBuffer = dataBuffer.asByteBuffer(0, bufferSize);
  Flux<DataBuffer> result = Flux.using(channelSupplier,
      channel -> Flux.create(sink -> {
        AsynchronousFileChannelReadCompletionHandler completionHandler =
            new AsynchronousFileChannelReadCompletionHandler(channel,
                sink, position, dataBufferFactory, bufferSize);
        channel.read(byteBuffer, position, dataBuffer, completionHandler);
        sink.onDispose(completionHandler::dispose);
      }),
      DataBufferUtils::closeChannel);
  return result.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
}

代码示例来源:origin: lettuce-io/lettuce-core

private static <K, V> Flux<ScoredValue<V>> zscan(RedisSortedSetReactiveCommands<K, V> commands, K key,
    Optional<ScanArgs> scanArgs) {
  LettuceAssert.notNull(commands, "RedisSortedSetReactiveCommands must not be null");
  LettuceAssert.notNull(key, "Key must not be null");
  return Flux.create(sink -> {
    Mono<ScoredValueScanCursor<V>> res = scanArgs.map(it -> commands.zscan(key, it)).orElseGet(
        () -> commands.zscan(key));
    scan(sink, res, c -> scanArgs.map(it -> commands.zscan(key, c, it)).orElseGet(() -> commands.zscan(key, c)), //
        ScoredValueScanCursor::getValues);
  });
}

代码示例来源:origin: lettuce-io/lettuce-core

private static <K, V> Flux<V> sscan(RedisSetReactiveCommands<K, V> commands, K key, Optional<ScanArgs> scanArgs) {
  LettuceAssert.notNull(commands, "RedisSetReactiveCommands must not be null");
  LettuceAssert.notNull(key, "Key must not be null");
  return Flux.create(sink -> {
    Mono<ValueScanCursor<V>> res = scanArgs.map(it -> commands.sscan(key, it)).orElseGet(() -> commands.sscan(key));
    scan(sink, res, c -> scanArgs.map(it -> commands.sscan(key, c, it)).orElseGet(() -> commands.sscan(key, c)), //
        ValueScanCursor::getValues);
  });
}

代码示例来源:origin: spring-projects/spring-framework

@Override
public Flux<Part> read(ResolvableType elementType, ReactiveHttpInputMessage message, Map<String, Object> hints) {
  return Flux.create(new SynchronossPartGenerator(message, this.bufferFactory, this.streamStorageFactory))
      .doOnNext(part -> {
        if (!Hints.isLoggingSuppressed(hints)) {
          LogFormatUtils.traceDebug(logger, traceOn -> Hints.getLogPrefix(hints) + "Parsed " +
              (isEnableLoggingRequestDetails() ?
                  LogFormatUtils.formatValue(part, !traceOn) :
                  "parts '" + part.name() + "' (content masked)"));
        }
      });
}

代码示例来源:origin: reactor/reactor-core

@Test
public void fluxCreateBufferedError2() {
  Flux<String> created = Flux.create(s -> {
    s.error(new Exception("test"));
  });
  StepVerifier.create(created)
        .verifyErrorMessage("test");
}

代码示例来源:origin: reactor/reactor-core

@Test
public void fluxCreateDropError2() {
  Flux<String> created = Flux.create(s -> {
    s.error(new Exception("test"));
  }, FluxSink.OverflowStrategy.DROP);
  StepVerifier.create(created)
        .verifyErrorMessage("test");
}

代码示例来源:origin: reactor/reactor-core

@Test
public void fluxCreateSerializedError2() {
  Flux<String> created = Flux.create(s -> {
    s.error(new Exception("test"));
  });
  StepVerifier.create(created)
        .verifyErrorMessage("test");
}

代码示例来源:origin: reactor/reactor-core

@Test
public void fluxCreateDropEmpty() {
  Flux<String> created =
      Flux.create(FluxSink::complete, FluxSink.OverflowStrategy.DROP);
  StepVerifier.create(created)
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void fluxCreateErrorError2() {
  Flux<String> created = Flux.create(s -> {
    s.error(new Exception("test"));
  }, FluxSink.OverflowStrategy.ERROR);
  StepVerifier.create(created)
        .verifyErrorMessage("test");
}

代码示例来源:origin: reactor/reactor-core

@Test
public void fluxCreateIgnoreError2() {
  Flux<String> created = Flux.create(s -> {
    s.error(new Exception("test"));
  }, FluxSink.OverflowStrategy.IGNORE);
  StepVerifier.create(created)
        .verifyErrorMessage("test");
}

代码示例来源:origin: org.springframework/spring-web

@Override
public Flux<Part> read(ResolvableType elementType, ReactiveHttpInputMessage message, Map<String, Object> hints) {
  return Flux.create(new SynchronossPartGenerator(message, this.bufferFactory, this.streamStorageFactory))
      .doOnNext(part -> {
        if (!Hints.isLoggingSuppressed(hints)) {
          LogFormatUtils.traceDebug(logger, traceOn -> Hints.getLogPrefix(hints) + "Parsed " +
              (isEnableLoggingRequestDetails() ?
                  LogFormatUtils.formatValue(part, !traceOn) :
                  "parts '" + part.name() + "' (content masked)"));
        }
      });
}

代码示例来源:origin: reactor/reactor-core

@Test
public void contextTest() {
  StepVerifier.create(Flux.create(s -> IntStream.range(0, 10).forEach(i -> s.next(s
      .currentContext()
                              .get(AtomicInteger.class)
                              .incrementAndGet())))
              .take(10)
              .subscriberContext(ctx -> ctx.put(AtomicInteger.class,
                  new AtomicInteger())))
        .expectNext(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void fluxCreateDropError() {
  Flux<String> created = Flux.create(s -> {
    s.next("test1");
    s.next("test2");
    s.next("test3");
    s.error(new Exception("test"));
  }, FluxSink.OverflowStrategy.DROP);
  StepVerifier.create(created)
        .expectNext("test1", "test2", "test3")
        .verifyErrorMessage("test");
}

代码示例来源:origin: reactor/reactor-core

@Test
public void fluxCreateErrorError() {
  Flux<String> created = Flux.create(s -> {
    s.next("test1");
    s.next("test2");
    s.next("test3");
    s.error(new Exception("test"));
  }, FluxSink.OverflowStrategy.ERROR);
  StepVerifier.create(created)
        .expectNext("test1", "test2", "test3")
        .verifyErrorMessage("test");
}

代码示例来源:origin: reactor/reactor-core

@Test
public void fluxCreateLatestError() {
  Flux<String> created = Flux.create(s -> {
    s.next("test1");
    s.next("test2");
    s.next("test3");
    s.error(new Exception("test"));
  }, FluxSink.OverflowStrategy.LATEST);
  StepVerifier.create(created)
        .expectNext("test1", "test2", "test3")
        .verifyErrorMessage("test");
}

代码示例来源:origin: reactor/reactor-core

@Test
public void switchFromCreateError2() {
  Flux<String> source = Flux.create(s -> {
        s.next("Three");
        s.next("Two");
        s.next("One");
        s.error(new Exception());
      });
  StepVerifier.create(source.onErrorReturn("Zero"))
        .expectNext("Three", "Two", "One", "Zero")
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void fluxCreateError2(){
  StepVerifier.create(Flux.create(s -> {
    s.next("test1");
    s.next("test2");
    s.next("test3");
    s.complete();
  }, FluxSink.OverflowStrategy.ERROR).publishOn(Schedulers.parallel()))
        .expectNext("test1", "test2", "test3")
        .verifyComplete();
}

相关文章

微信公众号

最新文章

更多

Flux类方法