本文整理了Java中reactor.core.publisher.Flux.fromArray()
方法的一些代码示例,展示了Flux.fromArray()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.fromArray()
方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:fromArray
[英]Create a Flux that emits the items contained in the provided array.
[中]创建一个通量,发射所提供数组中包含的项。
代码示例来源:origin: reactor/reactor-core
/**
* Create a {@link Flux} that emits the provided elements and then completes.
* <p>
* <img class="marble" src="doc-files/marbles/justMultiple.svg" alt="">
*
* @param data the elements to emit, as a vararg
* @param <T> the emitted data type
*
* @return a new {@link Flux}
*/
@SafeVarargs
public static <T> Flux<T> just(T... data) {
return fromArray(data);
}
代码示例来源:origin: reactor/reactor-core
/**
* Concatenates the values to the end of the {@link Flux}
* <p>
* <img class="marble" src="doc-files/marbles/concatWithValues.svg" alt="">
*
* @param values The values to concatenate
*
* @return a new {@link Flux} concatenating all source sequences
*/
@SafeVarargs
public final Flux<T> concatWithValues(T... values) {
return concatWith(Flux.fromArray(values));
}
代码示例来源:origin: spring-projects/spring-data-redis
@Override
@SuppressWarnings("unchecked")
public Mono<Long> remove(H key, Object... hashKeys) {
Assert.notNull(key, "Key must not be null!");
Assert.notNull(hashKeys, "Hash keys must not be null!");
Assert.notEmpty(hashKeys, "Hash keys must not be empty!");
Assert.noNullElements(hashKeys, "Hash keys must not contain null elements!");
return createMono(connection -> Flux.fromArray(hashKeys) //
.map(o -> (HK) o).map(this::rawHashKey) //
.collectList() //
.flatMap(hks -> connection.hDel(rawKey(key), hks)));
}
代码示例来源:origin: spring-projects/spring-data-redis
@Override
@SafeVarargs
public final Mono<Boolean> union(K destination, K... sourceKeys) {
Assert.notNull(destination, "Destination key must not be null!");
Assert.notEmpty(sourceKeys, "Source keys must not be null or empty!");
Assert.noNullElements(sourceKeys, "Source keys must not contain null elements!");
return createMono(connection -> Flux.fromArray(sourceKeys) //
.map(this::rawKey) //
.collectList() //
.flatMap(serialized -> connection.pfMerge(rawKey(destination), serialized)));
}
代码示例来源:origin: spring-projects/spring-data-redis
@Override
@SafeVarargs
public final Mono<List<String>> hash(K key, V... members) {
Assert.notNull(key, "Key must not be null!");
Assert.notEmpty(members, "Members must not be null or empty!");
Assert.noNullElements(members, "Members must not contain null elements!");
return createMono(connection -> Flux.fromArray(members) //
.map(this::rawValue) //
.collectList() //
.flatMap(serialized -> connection.geoHash(rawKey(key), serialized)));
}
代码示例来源:origin: spring-projects/spring-data-redis
@Override
@SafeVarargs
public final Mono<Long> add(K key, V... values) {
Assert.notNull(key, "Key must not be null!");
Assert.notEmpty(values, "Values must not be null or empty!");
Assert.noNullElements(values, "Values must not contain null elements!");
return createMono(connection -> Flux.fromArray(values) //
.map(this::rawValue) //
.collectList() //
.flatMap(serializedValues -> connection.pfAdd(rawKey(key), serializedValues)));
}
代码示例来源:origin: spring-projects/spring-data-redis
@Override
@SafeVarargs
public final Mono<Long> remove(K key, V... members) {
Assert.notNull(key, "Key must not be null!");
Assert.notEmpty(members, "Members must not be null or empty!");
Assert.noNullElements(members, "Members must not contain null elements!");
return template.createMono(connection -> Flux.fromArray(members) //
.map(this::rawValue) //
.collectList() //
.flatMap(serialized -> connection.zSetCommands().zRem(rawKey(key), serialized)));
}
代码示例来源:origin: reactor/reactor-core
@Test(expected = NullPointerException.class)
public void arrayNull() {
Flux.fromArray((Integer[]) null);
}
代码示例来源:origin: spring-projects/spring-data-redis
@Override
@SafeVarargs
public final Mono<Long> unlink(K... keys) {
Assert.notNull(keys, "Keys must not be null!");
Assert.notEmpty(keys, "Keys must not be empty!");
Assert.noNullElements(keys, "Keys must not contain null elements!");
if (keys.length == 1) {
return createMono(connection -> connection.keyCommands().unlink(rawKey(keys[0])));
}
Mono<List<ByteBuffer>> listOfKeys = Flux.fromArray(keys).map(this::rawKey).collectList();
return createMono(connection -> listOfKeys.flatMap(rawKeys -> connection.keyCommands().mUnlink(rawKeys)));
}
代码示例来源:origin: spring-projects/spring-data-redis
@Override
public Mono<Long> add(K key, V... values) {
Assert.notNull(key, "Key must not be null!");
if (values.length == 1) {
return createMono(connection -> connection.sAdd(rawKey(key), rawValue(values[0])));
}
return createMono(connection -> Flux.fromArray(values) //
.map(this::rawValue) //
.collectList() //
.flatMap(serialized -> connection.sAdd(rawKey(key), serialized)));
}
代码示例来源:origin: reactor/reactor-core
public Flux<String> processOrFallback(Mono<String> source, Publisher<String> fallback) {
return source
.flatMapMany(phrase -> Flux.fromArray(phrase.split("\\s+")))
.switchIfEmpty(fallback);
}
代码示例来源:origin: spring-projects/spring-data-redis
@Override
@SuppressWarnings("unchecked")
public Mono<Long> remove(K key, Object... values) {
Assert.notNull(key, "Key must not be null!");
if (values.length == 1) {
return createMono(connection -> connection.sRem(rawKey(key), rawValue((V) values[0])));
}
return createMono(connection -> Flux.fromArray((V[]) values) //
.map(this::rawValue) //
.collectList() //
.flatMap(serialized -> connection.sRem(rawKey(key), serialized)));
}
代码示例来源:origin: spring-projects/spring-framework
@Parameters(name = "client[{0}] - server [{1}]")
public static Object[][] arguments() throws IOException {
WebSocketClient[] clients = new WebSocketClient[] {
new TomcatWebSocketClient(),
new JettyWebSocketClient(),
new ReactorNettyWebSocketClient(),
new UndertowWebSocketClient(Xnio.getInstance().createWorker(OptionMap.EMPTY))
};
Map<HttpServer, Class<?>> servers = new LinkedHashMap<>();
servers.put(new TomcatHttpServer(TMP_DIR.getAbsolutePath(), WsContextListener.class), TomcatConfig.class);
servers.put(new JettyHttpServer(), JettyConfig.class);
servers.put(new ReactorHttpServer(), ReactorNettyConfig.class);
servers.put(new UndertowHttpServer(), UndertowConfig.class);
Flux<WebSocketClient> f1 = Flux.fromArray(clients).concatMap(c -> Flux.just(c).repeat(servers.size()));
Flux<HttpServer> f2 = Flux.fromIterable(servers.keySet()).repeat(clients.length);
Flux<Class<?>> f3 = Flux.fromIterable(servers.values()).repeat(clients.length);
return Flux.zip(f1, f2, f3).map(Tuple3::toArray).collectList().block()
.toArray(new Object[clients.length * servers.size()][2]);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void testBufferPredicateUntilIncludesBoundaryLastAfter() {
String[] colorSeparated = new String[]{"red", "green", "blue", "#", "green", "green", "#", "blue", "cyan"};
Flux<List<String>> colors = Flux
.fromArray(colorSeparated)
.bufferUntil(val -> val.equals("#"), false)
.log();
StepVerifier.create(colors)
.consumeNextWith(l1 -> Assert.assertThat(l1, contains("red", "green", "blue", "#")))
.consumeNextWith(l2 -> Assert.assertThat(l2, contains("green", "green", "#")))
.consumeNextWith(l3 -> Assert.assertThat(l3, contains("blue", "cyan")))
.expectComplete()
.verify();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void testBufferPredicateWhileDoesntIncludeBoundary() {
String[] colorSeparated = new String[]{"red", "green", "blue", "#", "green", "green", "#", "blue", "cyan"};
Flux<List<String>> colors = Flux
.fromArray(colorSeparated)
.bufferWhile(val -> !val.equals("#"))
.log();
StepVerifier.create(colors)
.consumeNextWith(l1 -> Assert.assertThat(l1, contains("red", "green", "blue")))
.consumeNextWith(l2 -> Assert.assertThat(l2, contains("green", "green")))
.consumeNextWith(l3 -> Assert.assertThat(l3, contains("blue", "cyan")))
.expectComplete()
.verify();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void testBufferPredicateUntilIncludesBoundaryLast() {
String[] colorSeparated = new String[]{"red", "green", "blue", "#", "green", "green", "#", "blue", "cyan"};
Flux<List<String>> colors = Flux
.fromArray(colorSeparated)
.bufferUntil(val -> val.equals("#"))
.log();
StepVerifier.create(colors)
.consumeNextWith(l1 -> Assert.assertThat(l1, contains("red", "green", "blue", "#")))
.consumeNextWith(l2 -> Assert.assertThat(l2, contains("green", "green", "#")))
.consumeNextWith(l3 -> Assert.assertThat(l3, contains("blue", "cyan")))
.expectComplete()
.verify();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void testBufferPredicateUntilCutBeforeIncludesBoundaryFirst() {
String[] colorSeparated = new String[]{"red", "green", "blue", "#", "green", "green", "#", "blue", "cyan"};
Flux<List<String>> colors = Flux
.fromArray(colorSeparated)
.bufferUntil(val -> val.equals("#"), true)
.log();
StepVerifier.create(colors)
.thenRequest(1)
.consumeNextWith(l1 -> Assert.assertThat(l1, contains("red", "green", "blue")))
.consumeNextWith(l2 -> Assert.assertThat(l2, contains("#", "green", "green")))
.consumeNextWith(l3 -> Assert.assertThat(l3, contains("#", "blue", "cyan")))
.expectComplete()
.verify();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void testForceShutdownWhileWaitingForRequest() throws InterruptedException {
TopicProcessor<String> processor = TopicProcessor.<String>builder().name("processor").bufferSize(4).build();
Publisher<String> publisher = Flux.fromArray(new String[] { "1", "2", "3", "4", "5" });
publisher.subscribe(processor);
AssertSubscriber<String> subscriber = AssertSubscriber.create(0);
processor.subscribe(subscriber);
subscriber.request(1);
Thread.sleep(250);
processor.forceShutdown();
assertTrue(processor.awaitAndShutdown(Duration.ofSeconds(1)));
}
代码示例来源:origin: reactor/reactor-core
@Test
public void testForceShutdownAfterShutdown() throws InterruptedException {
TopicProcessor<String> processor = TopicProcessor.<String>builder().name("processor").bufferSize(4).build();
Publisher<String> publisher = Flux.fromArray(new String[] { "1", "2", "3", "4", "5" });
publisher.subscribe(processor);
AssertSubscriber<String> subscriber = AssertSubscriber.create(0);
processor.subscribe(subscriber);
subscriber.request(1);
Thread.sleep(250);
processor.shutdown();
assertFalse(processor.awaitAndShutdown(Duration.ofMillis(400)));
processor.forceShutdown();
assertTrue(processor.awaitAndShutdown(Duration.ofMillis(400)));
}
代码示例来源:origin: reactor/reactor-core
@Test
public void testShutdownSuccessfulAfterAllDataIsRequested() throws InterruptedException {
TopicProcessor<String> processor = TopicProcessor.<String>builder().name("processor").bufferSize(4).build();
Publisher<String>
publisher = Flux.fromArray(new String[] { "1", "2", "3", "4", "5" });
publisher.subscribe(processor);
AssertSubscriber<String> subscriber = AssertSubscriber.create(0);
processor.subscribe(subscriber);
subscriber.request(1);
Thread.sleep(250);
processor.shutdown();
assertFalse(processor.awaitAndShutdown(Duration.ofMillis(250)));
subscriber.request(4);
assertTrue(processor.awaitAndShutdown(Duration.ofMillis(250)));
}
内容来源于网络,如有侵权,请联系作者删除!