本文整理了Java中reactor.core.publisher.Flux.create()
方法的一些代码示例,展示了Flux.create()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.create()
方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:create
[英]Programmatically create a Flux with the capability of emitting multiple elements in a synchronous or asynchronous manner through the FluxSink API. This includes emitting elements from multiple threads.
This Flux factory is useful if one wants to adapt some other multi-valued async API and not worry about cancellation and backpressure (which is handled by buffering all signals if the downstream can't keep up).
For example:
Flux.<String>create(emitter -> {
ActionListener al = e -> {
emitter.next(textField.getText());
};
// without cleanup support:
button.addActionListener(al);
// with cleanup support:
button.addActionListener(al);
emitter.onDispose(() -> {
button.removeListener(al);
});
});
[中]以编程方式创建具有通过FluxSink API以同步或异步方式发射多个元素能力的通量。这包括从多个线程发射元素。
如果您想要适应其他多值异步API,并且不担心取消和背压(如果下游无法跟上,则通过缓冲所有信号来处理),则此流量工厂非常有用。
例如:
Flux.<String>create(emitter -> {
ActionListener al = e -> {
emitter.next(textField.getText());
};
// without cleanup support:
button.addActionListener(al);
// with cleanup support:
button.addActionListener(al);
emitter.onDispose(() -> {
button.removeListener(al);
});
});
代码示例来源:origin: spring-projects/spring-framework
/**
* Write the given stream of {@link DataBuffer DataBuffers} to the given {@code AsynchronousFileChannel}.
* Does <strong>not</strong> close the channel when the flux is terminated, and does
* <strong>not</strong> {@linkplain #release(DataBuffer) release} the data buffers in the
* source. If releasing is required, then subscribe to the returned {@code Flux} with a
* {@link #releaseConsumer()}.
* <p>Note that the writing process does not start until the returned {@code Flux} is subscribed to.
* @param source the stream of data buffers to be written
* @param channel the channel to write to
* @param position the file position at which the write is to begin; must be non-negative
* @return a flux containing the same buffers as in {@code source}, that starts the writing
* process when subscribed to, and that publishes any writing errors and the completion signal
*/
public static Flux<DataBuffer> write(
Publisher<DataBuffer> source, AsynchronousFileChannel channel, long position) {
Assert.notNull(source, "'source' must not be null");
Assert.notNull(channel, "'channel' must not be null");
Assert.isTrue(position >= 0, "'position' must be >= 0");
Flux<DataBuffer> flux = Flux.from(source);
return Flux.create(sink -> {
AsynchronousFileChannelWriteCompletionHandler completionHandler =
new AsynchronousFileChannelWriteCompletionHandler(sink, channel, position);
sink.onDispose(completionHandler);
flux.subscribe(completionHandler);
});
}
代码示例来源:origin: spring-projects/spring-framework
/**
* Write the given stream of {@link DataBuffer DataBuffers} to the given {@code WritableByteChannel}. Does
* <strong>not</strong> close the channel when the flux is terminated, and does
* <strong>not</strong> {@linkplain #release(DataBuffer) release} the data buffers in the
* source. If releasing is required, then subscribe to the returned {@code Flux} with a
* {@link #releaseConsumer()}.
* <p>Note that the writing process does not start until the returned {@code Flux} is subscribed to.
* @param source the stream of data buffers to be written
* @param channel the channel to write to
* @return a flux containing the same buffers as in {@code source}, that starts the writing
* process when subscribed to, and that publishes any writing errors and the completion signal
*/
public static Flux<DataBuffer> write(Publisher<DataBuffer> source, WritableByteChannel channel) {
Assert.notNull(source, "'source' must not be null");
Assert.notNull(channel, "'channel' must not be null");
Flux<DataBuffer> flux = Flux.from(source);
return Flux.create(sink -> {
WritableByteChannelSubscriber subscriber =
new WritableByteChannelSubscriber(sink, channel);
sink.onDispose(subscriber);
flux.subscribe(subscriber);
});
}
代码示例来源:origin: lettuce-io/lettuce-core
private static <K, V> Flux<K> scan(RedisKeyReactiveCommands<K, V> commands, Optional<ScanArgs> scanArgs) {
LettuceAssert.notNull(commands, "RedisKeyCommands must not be null");
return Flux.create(sink -> {
Mono<KeyScanCursor<K>> res = scanArgs.map(commands::scan).orElseGet(commands::scan);
scan(sink, res, c -> scanArgs.map(it -> commands.scan(c, it)).orElseGet(() -> commands.scan(c)), //
KeyScanCursor::getKeys);
});
}
代码示例来源:origin: spring-projects/spring-framework
/**
* Obtain a {@code AsynchronousFileChannel} from the given supplier, and read it into a
* {@code Flux} of {@code DataBuffer}s, starting at the given position. Closes the
* channel when the flux is terminated.
* @param channelSupplier the supplier for the channel to read from
* @param position the position to start reading from
* @param dataBufferFactory the factory to create data buffers with
* @param bufferSize the maximum size of the data buffers
* @return a flux of data buffers read from the given channel
*/
public static Flux<DataBuffer> readAsynchronousFileChannel(Callable<AsynchronousFileChannel> channelSupplier,
long position, DataBufferFactory dataBufferFactory, int bufferSize) {
Assert.notNull(channelSupplier, "'channelSupplier' must not be null");
Assert.notNull(dataBufferFactory, "'dataBufferFactory' must not be null");
Assert.isTrue(position >= 0, "'position' must be >= 0");
Assert.isTrue(bufferSize > 0, "'bufferSize' must be > 0");
DataBuffer dataBuffer = dataBufferFactory.allocateBuffer(bufferSize);
ByteBuffer byteBuffer = dataBuffer.asByteBuffer(0, bufferSize);
Flux<DataBuffer> result = Flux.using(channelSupplier,
channel -> Flux.create(sink -> {
AsynchronousFileChannelReadCompletionHandler completionHandler =
new AsynchronousFileChannelReadCompletionHandler(channel,
sink, position, dataBufferFactory, bufferSize);
channel.read(byteBuffer, position, dataBuffer, completionHandler);
sink.onDispose(completionHandler::dispose);
}),
DataBufferUtils::closeChannel);
return result.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
}
代码示例来源:origin: lettuce-io/lettuce-core
private static <K, V> Flux<ScoredValue<V>> zscan(RedisSortedSetReactiveCommands<K, V> commands, K key,
Optional<ScanArgs> scanArgs) {
LettuceAssert.notNull(commands, "RedisSortedSetReactiveCommands must not be null");
LettuceAssert.notNull(key, "Key must not be null");
return Flux.create(sink -> {
Mono<ScoredValueScanCursor<V>> res = scanArgs.map(it -> commands.zscan(key, it)).orElseGet(
() -> commands.zscan(key));
scan(sink, res, c -> scanArgs.map(it -> commands.zscan(key, c, it)).orElseGet(() -> commands.zscan(key, c)), //
ScoredValueScanCursor::getValues);
});
}
代码示例来源:origin: lettuce-io/lettuce-core
private static <K, V> Flux<V> sscan(RedisSetReactiveCommands<K, V> commands, K key, Optional<ScanArgs> scanArgs) {
LettuceAssert.notNull(commands, "RedisSetReactiveCommands must not be null");
LettuceAssert.notNull(key, "Key must not be null");
return Flux.create(sink -> {
Mono<ValueScanCursor<V>> res = scanArgs.map(it -> commands.sscan(key, it)).orElseGet(() -> commands.sscan(key));
scan(sink, res, c -> scanArgs.map(it -> commands.sscan(key, c, it)).orElseGet(() -> commands.sscan(key, c)), //
ValueScanCursor::getValues);
});
}
代码示例来源:origin: spring-projects/spring-framework
@Override
public Flux<Part> read(ResolvableType elementType, ReactiveHttpInputMessage message, Map<String, Object> hints) {
return Flux.create(new SynchronossPartGenerator(message, this.bufferFactory, this.streamStorageFactory))
.doOnNext(part -> {
if (!Hints.isLoggingSuppressed(hints)) {
LogFormatUtils.traceDebug(logger, traceOn -> Hints.getLogPrefix(hints) + "Parsed " +
(isEnableLoggingRequestDetails() ?
LogFormatUtils.formatValue(part, !traceOn) :
"parts '" + part.name() + "' (content masked)"));
}
});
}
代码示例来源:origin: reactor/reactor-core
@Test
public void fluxCreateBufferedError2() {
Flux<String> created = Flux.create(s -> {
s.error(new Exception("test"));
});
StepVerifier.create(created)
.verifyErrorMessage("test");
}
代码示例来源:origin: reactor/reactor-core
@Test
public void fluxCreateDropError2() {
Flux<String> created = Flux.create(s -> {
s.error(new Exception("test"));
}, FluxSink.OverflowStrategy.DROP);
StepVerifier.create(created)
.verifyErrorMessage("test");
}
代码示例来源:origin: reactor/reactor-core
@Test
public void fluxCreateSerializedError2() {
Flux<String> created = Flux.create(s -> {
s.error(new Exception("test"));
});
StepVerifier.create(created)
.verifyErrorMessage("test");
}
代码示例来源:origin: reactor/reactor-core
@Test
public void fluxCreateDropEmpty() {
Flux<String> created =
Flux.create(FluxSink::complete, FluxSink.OverflowStrategy.DROP);
StepVerifier.create(created)
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void fluxCreateErrorError2() {
Flux<String> created = Flux.create(s -> {
s.error(new Exception("test"));
}, FluxSink.OverflowStrategy.ERROR);
StepVerifier.create(created)
.verifyErrorMessage("test");
}
代码示例来源:origin: reactor/reactor-core
@Test
public void fluxCreateIgnoreError2() {
Flux<String> created = Flux.create(s -> {
s.error(new Exception("test"));
}, FluxSink.OverflowStrategy.IGNORE);
StepVerifier.create(created)
.verifyErrorMessage("test");
}
代码示例来源:origin: org.springframework/spring-web
@Override
public Flux<Part> read(ResolvableType elementType, ReactiveHttpInputMessage message, Map<String, Object> hints) {
return Flux.create(new SynchronossPartGenerator(message, this.bufferFactory, this.streamStorageFactory))
.doOnNext(part -> {
if (!Hints.isLoggingSuppressed(hints)) {
LogFormatUtils.traceDebug(logger, traceOn -> Hints.getLogPrefix(hints) + "Parsed " +
(isEnableLoggingRequestDetails() ?
LogFormatUtils.formatValue(part, !traceOn) :
"parts '" + part.name() + "' (content masked)"));
}
});
}
代码示例来源:origin: reactor/reactor-core
@Test
public void contextTest() {
StepVerifier.create(Flux.create(s -> IntStream.range(0, 10).forEach(i -> s.next(s
.currentContext()
.get(AtomicInteger.class)
.incrementAndGet())))
.take(10)
.subscriberContext(ctx -> ctx.put(AtomicInteger.class,
new AtomicInteger())))
.expectNext(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void fluxCreateDropError() {
Flux<String> created = Flux.create(s -> {
s.next("test1");
s.next("test2");
s.next("test3");
s.error(new Exception("test"));
}, FluxSink.OverflowStrategy.DROP);
StepVerifier.create(created)
.expectNext("test1", "test2", "test3")
.verifyErrorMessage("test");
}
代码示例来源:origin: reactor/reactor-core
@Test
public void fluxCreateErrorError() {
Flux<String> created = Flux.create(s -> {
s.next("test1");
s.next("test2");
s.next("test3");
s.error(new Exception("test"));
}, FluxSink.OverflowStrategy.ERROR);
StepVerifier.create(created)
.expectNext("test1", "test2", "test3")
.verifyErrorMessage("test");
}
代码示例来源:origin: reactor/reactor-core
@Test
public void fluxCreateLatestError() {
Flux<String> created = Flux.create(s -> {
s.next("test1");
s.next("test2");
s.next("test3");
s.error(new Exception("test"));
}, FluxSink.OverflowStrategy.LATEST);
StepVerifier.create(created)
.expectNext("test1", "test2", "test3")
.verifyErrorMessage("test");
}
代码示例来源:origin: reactor/reactor-core
@Test
public void switchFromCreateError2() {
Flux<String> source = Flux.create(s -> {
s.next("Three");
s.next("Two");
s.next("One");
s.error(new Exception());
});
StepVerifier.create(source.onErrorReturn("Zero"))
.expectNext("Three", "Two", "One", "Zero")
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void fluxCreateError2(){
StepVerifier.create(Flux.create(s -> {
s.next("test1");
s.next("test2");
s.next("test3");
s.complete();
}, FluxSink.OverflowStrategy.ERROR).publishOn(Schedulers.parallel()))
.expectNext("test1", "test2", "test3")
.verifyComplete();
}
内容来源于网络,如有侵权,请联系作者删除!