
x33g5p2x  于2022-01-19 转载在 其他  



[英]Uses a resource, generated by a supplier for each individual Subscriber, while streaming the values from a Publisher derived from the same resource and makes sure the resource is released if the sequence terminates or the Subscriber cancels.

Eager resource cleanup happens just before the source termination and exceptions raised by the cleanup Consumer may override the terminal even.

For an asynchronous version of the cleanup, with distinct path for onComplete, onError and cancel terminations, see #usingWhen(Publisher,Function,Function,Function,Function).


代码示例来源:origin: reactor/reactor-core

 * Uses a resource, generated by a supplier for each individual Subscriber, while streaming the values from a
 * Publisher derived from the same resource and makes sure the resource is released if the sequence terminates or
 * the Subscriber cancels.
 * <p>
 * Eager resource cleanup happens just before the source termination and exceptions raised by the cleanup Consumer
 * may override the terminal even.
 * <p>
 * <img class="marble" src="doc-files/marbles/usingForFlux.svg" alt="">
 * <p>
 * For an asynchronous version of the cleanup, with distinct path for onComplete, onError
 * and cancel terminations, see {@link #usingWhen(Publisher, Function, Function, Function, Function)}.
 * @param resourceSupplier a {@link Callable} that is called on subscribe to generate the resource
 * @param sourceSupplier a factory to derive a {@link Publisher} from the supplied resource
 * @param resourceCleanup a resource cleanup callback invoked on completion
 * @param <T> emitted type
 * @param <D> resource type
 * @return a new {@link Flux} built around a disposable resource
 * @see #usingWhen(Publisher, Function, Function, Function, Function)
 * @see #usingWhen(Publisher, Function, Function)
public static <T, D> Flux<T> using(Callable<? extends D> resourceSupplier, Function<? super D, ? extends
    Publisher<? extends T>> sourceSupplier, Consumer<? super D> resourceCleanup) {
  return using(resourceSupplier, sourceSupplier, resourceCleanup, true);

代码示例来源:origin: spring-projects/spring-framework

 * Obtain a {@link ReadableByteChannel} from the given supplier, and read it into a
 * {@code Flux} of {@code DataBuffer}s. Closes the channel when the flux is terminated.
 * @param channelSupplier the supplier for the channel to read from
 * @param dataBufferFactory the factory to create data buffers with
 * @param bufferSize the maximum size of the data buffers
 * @return a flux of data buffers read from the given channel
public static Flux<DataBuffer> readByteChannel(
    Callable<ReadableByteChannel> channelSupplier, DataBufferFactory dataBufferFactory, int bufferSize) {
  Assert.notNull(channelSupplier, "'channelSupplier' must not be null");
  Assert.notNull(dataBufferFactory, "'dataBufferFactory' must not be null");
  Assert.isTrue(bufferSize > 0, "'bufferSize' must be > 0");
  return Flux.using(channelSupplier,
      channel -> {
        ReadableByteChannelGenerator generator =
            new ReadableByteChannelGenerator(channel, dataBufferFactory,
        return Flux.generate(generator);
      .doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);

代码示例来源:origin: org.springframework/spring-core

 * Obtain a {@link ReadableByteChannel} from the given supplier, and read it into a
 * {@code Flux} of {@code DataBuffer}s. Closes the channel when the flux is terminated.
 * @param channelSupplier the supplier for the channel to read from
 * @param dataBufferFactory the factory to create data buffers with
 * @param bufferSize the maximum size of the data buffers
 * @return a flux of data buffers read from the given channel
public static Flux<DataBuffer> readByteChannel(
    Callable<ReadableByteChannel> channelSupplier, DataBufferFactory dataBufferFactory, int bufferSize) {
  Assert.notNull(channelSupplier, "'channelSupplier' must not be null");
  Assert.notNull(dataBufferFactory, "'dataBufferFactory' must not be null");
  Assert.isTrue(bufferSize > 0, "'bufferSize' must be > 0");
  return Flux.using(channelSupplier,
      channel -> {
        ReadableByteChannelGenerator generator =
            new ReadableByteChannelGenerator(channel, dataBufferFactory,
        return Flux.generate(generator);
      .doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);

代码示例来源:origin: spring-projects/spring-framework

 * Obtain a {@code AsynchronousFileChannel} from the given supplier, and read it into a
 * {@code Flux} of {@code DataBuffer}s, starting at the given position. Closes the
 * channel when the flux is terminated.
 * @param channelSupplier the supplier for the channel to read from
 * @param position the position to start reading from
 * @param dataBufferFactory the factory to create data buffers with
 * @param bufferSize the maximum size of the data buffers
 * @return a flux of data buffers read from the given channel
public static Flux<DataBuffer> readAsynchronousFileChannel(Callable<AsynchronousFileChannel> channelSupplier,
    long position, DataBufferFactory dataBufferFactory, int bufferSize) {
  Assert.notNull(channelSupplier, "'channelSupplier' must not be null");
  Assert.notNull(dataBufferFactory, "'dataBufferFactory' must not be null");
  Assert.isTrue(position >= 0, "'position' must be >= 0");
  Assert.isTrue(bufferSize > 0, "'bufferSize' must be > 0");
  DataBuffer dataBuffer = dataBufferFactory.allocateBuffer(bufferSize);
  ByteBuffer byteBuffer = dataBuffer.asByteBuffer(0, bufferSize);
  Flux<DataBuffer> result = Flux.using(channelSupplier,
      channel -> Flux.create(sink -> {
        AsynchronousFileChannelReadCompletionHandler completionHandler =
            new AsynchronousFileChannelReadCompletionHandler(channel,
                sink, position, dataBufferFactory, bufferSize);, position, dataBuffer, completionHandler);
  return result.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);

代码示例来源:origin: reactor/reactor-core

@Test(expected = NullPointerException.class)
public void sourceFactoryNull() {
  Flux.using(() -> 1, null, r -> {
  }, false);

代码示例来源:origin: reactor/reactor-core

protected List<Scenario<String, String>> scenarios_operatorSuccess() {
  return Arrays.asList(
      scenario(f -> Flux.using(() -> 0, c -> f, c -> {})),
      scenario(f -> Flux.using(() -> 0, c -> f, c -> {}, false))

代码示例来源:origin: org.springframework/spring-core

 * Obtain a {@code AsynchronousFileChannel} from the given supplier, and read it into a
 * {@code Flux} of {@code DataBuffer}s, starting at the given position. Closes the
 * channel when the flux is terminated.
 * @param channelSupplier the supplier for the channel to read from
 * @param position the position to start reading from
 * @param dataBufferFactory the factory to create data buffers with
 * @param bufferSize the maximum size of the data buffers
 * @return a flux of data buffers read from the given channel
public static Flux<DataBuffer> readAsynchronousFileChannel(Callable<AsynchronousFileChannel> channelSupplier,
    long position, DataBufferFactory dataBufferFactory, int bufferSize) {
  Assert.notNull(channelSupplier, "'channelSupplier' must not be null");
  Assert.notNull(dataBufferFactory, "'dataBufferFactory' must not be null");
  Assert.isTrue(position >= 0, "'position' must be >= 0");
  Assert.isTrue(bufferSize > 0, "'bufferSize' must be > 0");
  DataBuffer dataBuffer = dataBufferFactory.allocateBuffer(bufferSize);
  ByteBuffer byteBuffer = dataBuffer.asByteBuffer(0, bufferSize);
  Flux<DataBuffer> result = Flux.using(channelSupplier,
      channel -> Flux.create(sink -> {
        AsynchronousFileChannelReadCompletionHandler completionHandler =
            new AsynchronousFileChannelReadCompletionHandler(channel,
                sink, position, dataBufferFactory, bufferSize);, position, dataBuffer, completionHandler);
  return result.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);

代码示例来源:origin: reactor/reactor-core

@Test(expected = NullPointerException.class)
public void resourceSupplierNull() {
  Flux.using(null, r -> Flux.empty(), r -> {
  }, false);

代码示例来源:origin: reactor/reactor-core

@Test(expected = NullPointerException.class)
public void resourceCleanupNull() {
  Flux.using(() -> 1, r -> Flux.empty(), null, false);

代码示例来源:origin: reactor/reactor-core

public void errorHandlingUsing() {
  AtomicBoolean isDisposed = new AtomicBoolean();
  Disposable disposableInstance = new Disposable() {
    public void dispose() {
      isDisposed.set(true); // <4>
    public String toString() {
      return "DISPOSABLE";
  Flux<String> flux =
      () -> disposableInstance, // <1>
      disposable -> Flux.just(disposable.toString()), // <2>
      Disposable::dispose // <3>

代码示例来源:origin: reactor/reactor-core

protected List<Scenario<String, String>> scenarios_operatorError() {
  return Arrays.asList(
      scenario(f -> Flux.using(() -> {
        throw exception();
      }, c -> f, c -> {}))
      scenario(f -> Flux.using(() -> 0, c -> null, c -> {}))
      scenario(f -> Flux.using(() -> 0, c -> {
        throw exception();
      }, c -> {}))
      /*scenario(f -> Flux.using(() -> 0, c -> f, c -> {
        throw exception();
      .verifier(step -> {
        try {
          step.expectNext(item(0), item(1), item(2)).verifyErrorMessage
        catch (Exception t){

代码示例来源:origin: reactor/reactor-core

public void normalEager() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  AtomicInteger cleanup = new AtomicInteger();
  Flux.using(() -> 1, r -> Flux.range(r, 10), cleanup::set)
  ts.assertValues(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
  Assert.assertEquals(1, cleanup.get());

代码示例来源:origin: reactor/reactor-core

public void factoryReturnsNull() {
  AssertSubscriber<Object> ts = AssertSubscriber.create();
  AtomicInteger cleanup = new AtomicInteger();
  Flux.<Integer, Integer>using(() -> 1,
      r -> null,
  Assert.assertEquals(1, cleanup.get());

代码示例来源:origin: reactor/reactor-core

public void factoryThrowsEager() {
  AssertSubscriber<Object> ts = AssertSubscriber.create();
  AtomicInteger cleanup = new AtomicInteger();
  Flux.using(() -> 1, r -> {
    throw new RuntimeException("forced failure");
  }, cleanup::set, false)
   .assertErrorMessage("forced failure");
  Assert.assertEquals(1, cleanup.get());

代码示例来源:origin: reactor/reactor-core

public void resourceThrowsEager() {
  AssertSubscriber<Object> ts = AssertSubscriber.create();
  AtomicInteger cleanup = new AtomicInteger();
  Flux.using(() -> {
    throw new RuntimeException("forced failure");
  }, r -> Flux.range(1, 10), cleanup::set, false)
   .assertErrorMessage("forced failure");
  Assert.assertEquals(0, cleanup.get());

代码示例来源:origin: reactor/reactor-core

public void normal() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  AtomicInteger cleanup = new AtomicInteger();
  Flux.using(() -> 1, r -> Flux.range(r, 10), cleanup::set, false)
  ts.assertValues(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
  Assert.assertEquals(1, cleanup.get());

代码示例来源:origin: reactor/reactor-core

Flux.using(() -> 1, r -> {
  if (fail) {
    return Flux.error(new RuntimeException("forced failure"));

代码示例来源:origin: reactor/reactor-core

public void subscriberCancels() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  AtomicInteger cleanup = new AtomicInteger();
  DirectProcessor<Integer> tp = DirectProcessor.create();
  Flux.using(() -> 1, r -> tp, cleanup::set, true)
  Assert.assertTrue("No subscriber?", tp.hasDownstreams());
  Assert.assertFalse("Has subscriber?", tp.hasDownstreams());
  Assert.assertEquals(1, cleanup.get());

代码示例来源:origin: org.apache.servicemix.bundles/org.apache.servicemix.bundles.spring-core

 * Obtain a {@link ReadableByteChannel} from the given supplier, and read it into a
 * {@code Flux} of {@code DataBuffer}s. Closes the channel when the flux is terminated.
 * @param channelSupplier the supplier for the channel to read from
 * @param dataBufferFactory the factory to create data buffers with
 * @param bufferSize the maximum size of the data buffers
 * @return a flux of data buffers read from the given channel
public static Flux<DataBuffer> readByteChannel(
    Callable<ReadableByteChannel> channelSupplier, DataBufferFactory dataBufferFactory, int bufferSize) {
  Assert.notNull(channelSupplier, "'channelSupplier' must not be null");
  Assert.notNull(dataBufferFactory, "'dataBufferFactory' must not be null");
  Assert.isTrue(bufferSize > 0, "'bufferSize' must be > 0");
  return Flux.using(channelSupplier,
      channel -> {
        ReadableByteChannelGenerator generator =
            new ReadableByteChannelGenerator(channel, dataBufferFactory,
        return Flux.generate(generator);
      .doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);

代码示例来源:origin: io.projectreactor/reactor-netty

default ByteBufEncodedFlux receive() {
  return ByteBufEncodedFlux.encoded(receiveParts().onBackpressureError()
                          .concatMap(parts -> parts.aggregate()
                          .flatMap(bb ->
                              Flux.using(() -> bb,




