reactor.core.publisher.Mono.fromRunnable()方法的使用及代码示例

x33g5p2x  于2022-01-24 转载在 其他  
字(9.6k)|赞(0)|评价(0)|浏览(1513)

本文整理了Java中reactor.core.publisher.Mono.fromRunnable()方法的一些代码示例,展示了Mono.fromRunnable()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Mono.fromRunnable()方法的具体详情如下:
包路径:reactor.core.publisher.Mono
类名称:Mono
方法名:fromRunnable

Mono.fromRunnable介绍

[英]Create a Mono that completes empty once the provided Runnable has been executed.
[中]创建一个Mono,一旦执行了提供的Runnable,该Mono就完成为空。

代码示例

代码示例来源:origin: codecentric/spring-boot-admin

@Override
public Mono<Void> append(List<InstanceEvent> events) {
  return Mono.fromRunnable(() -> {
    while (true) {
      if (doAppend(events)) {
        return;
      }
    }
  });
}

代码示例来源:origin: spring-projects/spring-framework

private Mono<Void> stopScheduler() {
  return Mono.fromRunnable(() -> {
    this.scheduler.dispose();
    for (int i = 0; i < 20; i++) {
      if (this.scheduler.isDisposed()) {
        break;
      }
      try {
        Thread.sleep(100);
      }
      catch (Throwable ex) {
        break;
      }
    }
  });
}

代码示例来源:origin: spring-projects/spring-framework

/**
 * Apply {@link #beforeCommit(Supplier) beforeCommit} actions, apply the
 * request headers/cookies, and write the request body.
 * @param writeAction the action to write the request body (may be {@code null})
 * @return a completion publisher
 */
protected Mono<Void> doCommit(@Nullable Supplier<? extends Publisher<Void>> writeAction) {
  if (!this.state.compareAndSet(State.NEW, State.COMMITTING)) {
    return Mono.empty();
  }
  this.commitActions.add(() ->
      Mono.fromRunnable(() -> {
        applyHeaders();
        applyCookies();
        this.state.set(State.COMMITTED);
      }));
  if (writeAction != null) {
    this.commitActions.add(writeAction);
  }
  List<? extends Publisher<Void>> actions = this.commitActions.stream()
      .map(Supplier::get).collect(Collectors.toList());
  return Flux.concat(actions).then();
}

代码示例来源:origin: codecentric/spring-boot-admin

@Override
protected Mono<Void> doNotify(InstanceEvent event, Instance instance) {
  HttpHeaders headers = new HttpHeaders();
  headers.setContentType(MediaType.APPLICATION_JSON);
  // Let's Chat requiers the token as basic username, the password can be an arbitrary string.
  String auth = Base64Utils.encodeToString(String.format("%s:%s", token, username)
                          .getBytes(StandardCharsets.UTF_8));
  headers.add(HttpHeaders.AUTHORIZATION, String.format("Basic %s", auth));
  return Mono.fromRunnable(() -> restTemplate.exchange(createUrl(),
    HttpMethod.POST,
    new HttpEntity<>(createMessage(event, instance), headers),
    Void.class
  ));
}

代码示例来源:origin: spring-projects/spring-security

@Override
public Mono<Void> saveAuthorizedClient(OAuth2AuthorizedClient authorizedClient, Authentication principal) {
  Assert.notNull(authorizedClient, "authorizedClient cannot be null");
  Assert.notNull(principal, "principal cannot be null");
  return Mono.fromRunnable(() -> {
    String identifier = this.getIdentifier(authorizedClient.getClientRegistration(), principal.getName());
    this.authorizedClients.put(identifier, authorizedClient);
  });
}

代码示例来源:origin: spring-projects/spring-framework

/**
 * Apply {@link #beforeCommit(Supplier) beforeCommit} actions, apply the
 * response status and headers/cookies, and write the response body.
 * @param writeAction the action to write the response body (may be {@code null})
 * @return a completion publisher
 */
protected Mono<Void> doCommit(@Nullable Supplier<? extends Mono<Void>> writeAction) {
  if (!this.state.compareAndSet(State.NEW, State.COMMITTING)) {
    return Mono.empty();
  }
  this.commitActions.add(() ->
      Mono.fromRunnable(() -> {
        applyStatusCode();
        applyHeaders();
        applyCookies();
        this.state.set(State.COMMITTED);
      }));
  if (writeAction != null) {
    this.commitActions.add(writeAction);
  }
  List<? extends Mono<Void>> actions = this.commitActions.stream()
      .map(Supplier::get).collect(Collectors.toList());
  return Flux.concat(actions).then();
}

代码示例来源:origin: codecentric/spring-boot-admin

@Override
protected Mono<Void> doNotify(InstanceEvent event, Instance instance) {
  return Mono.fromRunnable(
    () -> restTemplate.getForObject(buildUrl(), Void.class, createMessage(event, instance)));
}

代码示例来源:origin: codecentric/spring-boot-admin

@Override
protected Mono<Void> doNotify(InstanceEvent event, Instance instance) {
  return Mono.fromRunnable(
    () -> restTemplate.exchange(buildUrl(event, instance), HttpMethod.POST, createRequest(event, instance),
      Void.class));
}

代码示例来源:origin: codecentric/spring-boot-admin

@Override
protected Mono<Void> doNotify(InstanceEvent event, Instance instance) {
  return Mono.fromRunnable(
    () -> restTemplate.postForEntity(url, createPagerdutyEvent(event, instance), Void.class));
}

代码示例来源:origin: codecentric/spring-boot-admin

@Override
protected Mono<Void> doNotify(InstanceEvent event, Instance instance) {
  if (webhookUrl == null) {
    return Mono.error(new IllegalStateException("'webhookUrl' must not be null."));
  }
  return Mono.fromRunnable(() -> restTemplate.postForEntity(
    webhookUrl,
    createDiscordNotification(event, instance),
    Void.class
  ));
}

代码示例来源:origin: codecentric/spring-boot-admin

@Override
protected Mono<Void> doNotify(InstanceEvent event, Instance instance) {
  if (webhookUrl == null) {
    return Mono.error(new IllegalStateException("'webhookUrl' must not be null."));
  }
  return Mono.fromRunnable(
    () -> restTemplate.postForEntity(webhookUrl, createMessage(event, instance), Void.class));
}

代码示例来源:origin: forezp/SpringCloudLearning

@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
  exchange.getAttributes().put(REQUEST_TIME_BEGIN, System.currentTimeMillis());
  return chain.filter(exchange).then(
      Mono.fromRunnable(() -> {
        Long startTime = exchange.getAttribute(REQUEST_TIME_BEGIN);
        if (startTime != null) {
          log.info(exchange.getRequest().getURI().getRawPath() + ": " + (System.currentTimeMillis() - startTime) + "ms");
        }
      })
  );
}

代码示例来源:origin: codecentric/spring-boot-admin

@Override
protected Mono<Void> doNotify(InstanceEvent event, Instance instance) {
  return Mono.fromRunnable(() -> restTemplate.postForEntity(buildUrl(),
    createHipChatNotification(event, instance),
    Void.class
  ));
}

代码示例来源:origin: codecentric/spring-boot-admin

@Override
  public Mono<Void> append(List<InstanceEvent> events) {
    return super.append(events).then(Mono.fromRunnable(() -> this.publish(events)));
  }
}

代码示例来源:origin: codecentric/spring-boot-admin

@Override
public Mono<Void> notify(InstanceEvent event) {
  return super.notify(event).then(Mono.fromRunnable(() -> updateLastStatus(event)));
}

代码示例来源:origin: codecentric/spring-boot-admin

@Override
public Mono<Void> doNotify(InstanceEvent event, Instance instance) {
  return delegate.notify(event).onErrorResume(error -> Mono.empty()).then(Mono.fromRunnable(() -> {
    if (shouldEndReminder(event)) {
      reminders.remove(event.getInstance());
    } else if (shouldStartReminder(event)) {
      reminders.putIfAbsent(event.getInstance(), new Reminder(event));
    }
  }));
}

代码示例来源:origin: codecentric/spring-boot-admin

@Override
protected Mono<Void> doNotify(InstanceEvent event, Instance instance) {
  return Mono.fromRunnable(() -> {
    if (event instanceof InstanceStatusChangedEvent) {
      LOGGER.info("Instance {} ({}) is {}", instance.getRegistration().getName(), event.getInstance(),
        ((InstanceStatusChangedEvent) event).getStatusInfo().getStatus());
    } else {
      LOGGER.info("Instance {} ({}) {}", instance.getRegistration().getName(), event.getInstance(),
        event.getType());
    }
  });
}

代码示例来源:origin: codecentric/spring-boot-admin

protected Mono<Void> updateSnapshot(InstanceEvent event) {
  return Mono.<Void>fromRunnable(() -> snapshots.compute(event.getInstance(), (key, old) -> {
    Instance instance = old != null ? old : Instance.create(key);
    return instance.apply(event);
  })).onErrorResume(ex -> {
    log.warn(
      "Error while updating the snapshot with event {}. Recomputing instance snapshot from event history.",
      event,
      ex
    );
    return recomputeSnapshot(event.getInstance());
  });
}

代码示例来源:origin: spring-projects/spring-framework

@Test
public void beforeCommitWithComplete() throws Exception {
  ResponseCookie cookie = ResponseCookie.from("ID", "123").build();
  TestServerHttpResponse response = new TestServerHttpResponse();
  response.beforeCommit(() -> Mono.fromRunnable(() -> response.getCookies().add(cookie.getName(), cookie)));
  response.writeWith(Flux.just(wrap("a"), wrap("b"), wrap("c"))).block();
  assertTrue(response.statusCodeWritten);
  assertTrue(response.headersWritten);
  assertTrue(response.cookiesWritten);
  assertSame(cookie, response.getCookies().getFirst("ID"));
  assertEquals(3, response.body.size());
  assertEquals("a", new String(response.body.get(0).asByteBuffer().array(), StandardCharsets.UTF_8));
  assertEquals("b", new String(response.body.get(1).asByteBuffer().array(), StandardCharsets.UTF_8));
  assertEquals("c", new String(response.body.get(2).asByteBuffer().array(), StandardCharsets.UTF_8));
}

代码示例来源:origin: spring-projects/spring-framework

@Override
public Mono<ClientHttpResponse> connect(HttpMethod httpMethod, URI uri,
    Function<? super ClientHttpRequest, Mono<Void>> requestCallback) {
  MonoProcessor<ClientHttpResponse> result = MonoProcessor.create();
  MockClientHttpRequest mockClientRequest = new MockClientHttpRequest(httpMethod, uri);
  MockServerHttpResponse mockServerResponse = new MockServerHttpResponse();
  mockClientRequest.setWriteHandler(requestBody -> {
    log("Invoking HttpHandler for ", httpMethod, uri);
    ServerHttpRequest mockServerRequest = adaptRequest(mockClientRequest, requestBody);
    ServerHttpResponse responseToUse = prepareResponse(mockServerResponse, mockServerRequest);
    this.handler.handle(mockServerRequest, responseToUse).subscribe(aVoid -> {}, result::onError);
    return Mono.empty();
  });
  mockServerResponse.setWriteHandler(responseBody ->
      Mono.fromRunnable(() -> {
        log("Creating client response for ", httpMethod, uri);
        result.onNext(adaptResponse(mockServerResponse, responseBody));
      }));
  log("Writing client request for ", httpMethod, uri);
  requestCallback.apply(mockClientRequest).subscribe(aVoid -> {}, result::onError);
  return result;
}

相关文章

微信公众号

最新文章

更多

Mono类方法