reactor.core.publisher.Flux.compose()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(5.8k)|赞(0)|评价(0)|浏览(179)

本文整理了Java中reactor.core.publisher.Flux.compose()方法的一些代码示例,展示了Flux.compose()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.compose()方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:compose

Flux.compose介绍

[英]Defer the transformation of this Flux in order to generate a target Flux type. A transformation will occur for each Subscriber. For instance:

flux.compose(original -> original.log());

[中]推迟该通量的变换,以生成目标通量类型。每个订阅服务器都将发生转换。例如:

flux.compose(original -> original.log());

代码示例

代码示例来源:origin: codecentric/spring-boot-admin

public void start() {
  subscription = Flux.from(publisher)
            .log(log.getName(), Level.FINEST)
            .doOnSubscribe(s -> log.debug("Subscribed to {} events", eventType))
            .ofType(eventType)
            .cast(eventType)
            .compose(this::handle)
            .onErrorContinue((ex, value) -> log.warn("Unexpected error while handling {}", value, ex))
            .subscribe();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void advancedTransform() {
  AtomicInteger ai = new AtomicInteger();
  Function<Flux<String>, Flux<String>> filterAndMap = f -> {
    if (ai.incrementAndGet() == 1) {
      return f.filter(color -> !color.equals("orange"))
          .map(String::toUpperCase);
    }
    return f.filter(color -> !color.equals("purple"))
        .map(String::toUpperCase);
  };
  Flux<String> composedFlux =
      Flux.fromIterable(Arrays.asList("blue", "green", "orange", "purple"))
        .doOnNext(System.out::println)
        .compose(filterAndMap);
  composedFlux.subscribe(d -> System.out.println("Subscriber 1 to Composed MapAndFilter :"+d));
  composedFlux.subscribe(d -> System.out.println("Subscriber 2 to Composed MapAndFilter: "+d));
}

代码示例来源:origin: reactor/reactor-core

@Test
public void perSubscriberState() {
  
  Flux<Integer> source = Flux.just(10).compose(f -> {
    AtomicInteger value = new AtomicInteger();
    return f.map(v -> v + value.incrementAndGet());
  });
  
  
  for (int i = 0; i < 10; i++) {
    AssertSubscriber<Integer> ts = AssertSubscriber.create();
    
    source.subscribe(ts);
    
    ts.assertValues(11)
    .assertComplete()
    .assertNoError();
  }
}

代码示例来源:origin: reactor/reactor-core

@Test
public void composerThrows() {
  Flux<Integer> source = Flux.just(10).compose(f -> {
    throw new RuntimeException("Forced failure");
  });
  
  for (int i = 0; i < 10; i++) {
    AssertSubscriber<Integer> ts = AssertSubscriber.create();
    
    source.subscribe(ts);
    
    ts.assertNoValues()
    .assertNotComplete()
    .assertError(RuntimeException.class);
  }
  
}

代码示例来源:origin: reactor/reactor-core

@Test
  public void composerReturnsNull() {
    Flux<Integer> source = Flux.just(10).compose(f -> {
      return null;
    });
    
    for (int i = 0; i < 10; i++) {
      AssertSubscriber<Integer> ts = AssertSubscriber.create();
      
      source.subscribe(ts);
      
      ts.assertNoValues()
      .assertNotComplete()
      .assertError(NullPointerException.class);
    }
    
  }
}

代码示例来源:origin: com.aol.cyclops/cyclops-reactor

/**
 * @param transformer
 * @return
 * @see reactor.core.publisher.Flux#compose(java.util.function.Function)
 */
public final <V> Flux<V> compose(Function<? super Flux<T>, ? extends Publisher<V>> transformer) {
  return boxed.compose(transformer);
}
/**

代码示例来源:origin: spring-cloud/spring-cloud-function

if (supplier instanceof FluxSupplier) {
  FluxConsumer<Object> fConsumer = ((FluxConsumer<Object>)b);
  return (Supplier<Mono<Void>>) () -> Mono.from(supplier.get().compose(v -> fConsumer.apply(supplier.get())));

代码示例来源:origin: org.springframework.cloud/spring-cloud-function-context

if (supplier instanceof FluxSupplier) {
  FluxConsumer<Object> fConsumer = ((FluxConsumer<Object>)b);
  return (Supplier<Mono<Void>>) () -> Mono.from(supplier.get().compose(v -> fConsumer.apply(supplier.get())));

代码示例来源:origin: org.cloudfoundry/cloudfoundry-operations

private static Flux<LogMessage> getLogs(Mono<DopplerClient> dopplerClient, String applicationId, Boolean recent) {
  if (Optional.ofNullable(recent).orElse(false)) {
    return requestLogsRecent(dopplerClient, applicationId)
      .filter(e -> EventType.LOG_MESSAGE == e.getEventType())
      .map(Envelope::getLogMessage)
      .collectSortedList(LOG_MESSAGE_COMPARATOR)
      .flatMapIterable(d -> d);
  } else {
    return requestLogsStream(dopplerClient, applicationId)
      .filter(e -> EventType.LOG_MESSAGE == e.getEventType())
      .map(Envelope::getLogMessage)
      .compose(SortingUtils.timespan(LOG_MESSAGE_COMPARATOR, LOG_MESSAGE_TIMESPAN));
  }
}

代码示例来源:origin: akarnokd/akarnokd-misc

@Test
public void errorDedicated() {
  Flux.error(new IOException())
  .compose(composeIfNonEmptyDedicated(f -> {
    System.out.println("Composed!");
    return f.doOnNext(System.out::println).then();
  }))
  .subscribe(v -> { }, Throwable::printStackTrace, () -> System.out.println("Done"));
}

代码示例来源:origin: akarnokd/akarnokd-misc

@Test
public void error() {
  Flux.error(new IOException())
  .compose(composeIfNonEmpty(f -> {
    System.out.println("Composed!");
    return f.doOnNext(System.out::println).then();
  }))
  .subscribe(v -> { }, Throwable::printStackTrace, () -> System.out.println("Done"));
}

代码示例来源:origin: akarnokd/akarnokd-misc

public static void main(String[] args) {
  Flux.just(1, 2, 3, 6, 7, 10)
  .flatMap(v -> Mono.delay(Duration.ofMillis(v * 1000)).doOnNext(w -> System.out.println("T=" + v)).map(w -> v))
  .compose(f -> delayedBufferAfterFirst(f, Duration.ofSeconds(2)))
  .doOnNext(System.out::println)
  .blockLast();
}

代码示例来源:origin: akarnokd/akarnokd-misc

@Test
public void simpleDedicated() throws Exception {
  Flux.range(1, 10)
  .hide()
  .compose(composeIfNonEmptyDedicated(f -> {
    System.out.println("Composed!");
    return f.doOnNext(System.out::println).then();
  }))
  .subscribe(v -> { }, Throwable::printStackTrace, () -> System.out.println("Done"));
}

代码示例来源:origin: akarnokd/akarnokd-misc

@Test
public void simple() throws Exception {
  Flux.range(1, 10)
  .hide()
  .compose(composeIfNonEmpty(f -> {
    System.out.println("Composed!");
    return f.doOnNext(System.out::println).then();
  }))
  .subscribe(v -> { }, Throwable::printStackTrace, () -> System.out.println("Done"));
}

相关文章

微信公众号

最新文章

更多

Flux类方法