Spring 5 通过引入一个名为 Spring WebFlux 的全新响应式框架来拥抱响应式编程范式。
Spring WebFlux 是一个自底向上的异步框架。它可以使用 Servlet 3.1 非阻塞 IO API 以及其他异步运行时环境(例如 netty 或 undertow)在 Servlet 容器上运行。
它将与 Spring MVC 一起使用。是的,Spring MVC 不会去任何地方。它是一种流行的 Web 框架,开发人员已经使用了很长时间。
但是您现在可以在新的反应式框架和传统的 Spring MVC 之间做出选择。您可以根据您的用例选择使用其中的任何一个。
Spring WebFlux 使用一个名为 Reactor 的库来提供响应式支持。 Reactor 是 Reactive Streams 规范的一个实现。
Reactor 提供了两种主要类型,称为 Flux
和 Mono
。这两种类型都实现了 Reactive Streams 提供的 Publisher
接口。 Flux
用于表示 0..N 个元素的流,Mono
用于表示 0..1 个元素的流。
尽管 Spring 使用 Reactor 作为其大部分内部 API 的核心依赖项,但它也支持在应用程序级别使用 RxJava。
Spring WebFlux 支持两种类型的编程模型:
传统的基于注解的模型,带有 @Controller
、@RequestMapping
和您在 Spring MVC 中一直使用的其他注解。
1.
基于 Java 8 lambdas 的全新功能样式模型,用于路由和处理请求。
在本文中,我们将使用传统的基于注解的编程模型。我会在以后的文章中写到函数式风格模型。
在本文中,我们将为一个迷你推特应用程序构建一个 Restful API。该应用程序将只有一个名为 Tweet
的域模型。每个 Tweet
都会有一个 text
和一个 createdAt
字段。
我们将使用 MongoDB 作为我们的数据存储以及响应式 mongodb 驱动程序。我们将构建用于创建、检索、更新和删除推文的 REST API。所有 REST API 都将是异步的,并将返回一个发布者。
我们还将学习如何将数据从数据库传输到客户端。
最后,我们将使用 Spring 5 提供的新异步 WebTestClient 编写集成测试来测试所有 API。
让我们使用 Spring Initializr Web 应用程序来生成我们的应用程序。按照以下步骤生成项目 -
下载项目后,将其解压缩并将其导入您喜欢的 IDE。项目的目录结构应该是这样的——
您可以通过简单地将以下属性添加到 application.properties
文件来配置 MongoDB -
spring.data.mongodb.uri=mongodb://localhost:27017/webflux_demo
Spring Boot 将在启动时读取此配置并自动配置数据源。
让我们创建我们的域模型 - Tweet
。在 com.example.webfluxdemo
包中创建一个名为 model
的新包,然后创建一个名为 Tweet.java
的文件,内容如下:
package com.example.webfluxdemo.model;
import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.mapping.Document;
import javax.validation.constraints.NotBlank;
import javax.validation.constraints.NotNull;
import javax.validation.constraints.Size;
import java.util.Date;
@Document(collection = "tweets")
public class Tweet {
@Id
private String id;
@NotBlank
@Size(max = 140)
private String text;
@NotNull
private Date createdAt = new Date();
public Tweet() {
}
public Tweet(String text) {
this.id = id;
this.text = text;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getText() {
return text;
}
public void setText(String text) {
this.text = text;
}
public Date getCreatedAt() {
return createdAt;
}
public void setCreatedAt(Date createdAt) {
this.createdAt = createdAt;
}
}
够简单! Tweet 模型包含一个 text
和一个 createdAt
字段。 text
字段使用 @NotBlank
和 @Size
批注,以确保它不为空且最多 140 个字符。
接下来,我们将创建用于访问 MongoDB 数据库的数据访问层。在 com.example.webfluxdemo
中创建一个名为 repository
的新包,然后创建一个名为 TweetRepository.java
的新文件,内容如下:
package com.example.webfluxdemo.repository;
import com.example.webfluxdemo.model.Tweet;
import org.springframework.data.mongodb.repository.ReactiveMongoRepository;
import org.springframework.stereotype.Repository;
@Repository
public interface TweetRepository extends ReactiveMongoRepository<Tweet, String> {
}
TweetRepository
接口从 ReactiveMongoRepository
扩展而来,它在 Document 上公开了各种 CRUD 方法。
Spring Boot 在运行时自动插入一个名为 SimpleReactiveMongoRepository
的接口实现。
因此,您无需编写任何代码即可轻松使用 Document 上的所有 CRUD 方法。以下是 SimpleReactiveMongoRepository
提供的一些方法 -
reactor.core.publisher.Flux<T> findAll();
reactor.core.publisher.Mono<T> findById(ID id);
<S extends T> reactor.core.publisher.Mono<S> save(S entity);
reactor.core.publisher.Mono<Void> delete(T entity);
请注意,所有方法都是异步的,并以 Flux
或 Mono
类型的形式返回发布者。
最后,让我们编写将暴露给客户端的 API。在 com.example.webfluxdemo
中创建一个名为 controller
的新包,然后创建一个名为 TweetController.java
的新文件,内容如下:
package com.example.webfluxdemo.controller;
import com.example.webfluxdemo.model.Tweet;
import com.example.webfluxdemo.repository.TweetRepository;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import javax.validation.Valid;
@RestController
public class TweetController {
@Autowired
private TweetRepository tweetRepository;
@GetMapping("/tweets")
public Flux<Tweet> getAllTweets() {
return tweetRepository.findAll();
}
@PostMapping("/tweets")
public Mono<Tweet> createTweets(@Valid @RequestBody Tweet tweet) {
return tweetRepository.save(tweet);
}
@GetMapping("/tweets/{id}")
public Mono<ResponseEntity<Tweet>> getTweetById(@PathVariable(value = "id") String tweetId) {
return tweetRepository.findById(tweetId)
.map(savedTweet -> ResponseEntity.ok(savedTweet))
.defaultIfEmpty(ResponseEntity.notFound().build());
}
@PutMapping("/tweets/{id}")
public Mono<ResponseEntity<Tweet>> updateTweet(@PathVariable(value = "id") String tweetId,
@Valid @RequestBody Tweet tweet) {
return tweetRepository.findById(tweetId)
.flatMap(existingTweet -> {
existingTweet.setText(tweet.getText());
return tweetRepository.save(existingTweet);
})
.map(updatedTweet -> new ResponseEntity<>(updatedTweet, HttpStatus.OK))
.defaultIfEmpty(new ResponseEntity<>(HttpStatus.NOT_FOUND));
}
@DeleteMapping("/tweets/{id}")
public Mono<ResponseEntity<Void>> deleteTweet(@PathVariable(value = "id") String tweetId) {
return tweetRepository.findById(tweetId)
.flatMap(existingTweet ->
tweetRepository.delete(existingTweet)
.then(Mono.just(new ResponseEntity<Void>(HttpStatus.OK)))
)
.defaultIfEmpty(new ResponseEntity<>(HttpStatus.NOT_FOUND));
}
// Tweets are Sent to the client as Server Sent Events
@GetMapping(value = "/stream/tweets", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<Tweet> streamAllTweets() {
return tweetRepository.findAll();
}
}
所有控制器端点都以 Flux 或 Mono 的形式返回发布者。最后一个端点非常有趣,我们将内容类型设置为 text/event-stream
。它以 Server Sent Events 的形式将推文发送到这样的浏览器 -
data: {"id":"59ba5389d2b2a85ed4ebdafa","text":"tweet1","createdAt":1505383305602}
data: {"id":"59ba5587d2b2a85f93b8ece7","text":"tweet2","createdAt":1505383814847}
既然我们在谈论事件流,你可能会问下面的端点是不是也返回一个流?
@GetMapping("/tweets")
public Flux<Tweet> getAllTweets() {
return tweetRepository.findAll();
}
答案是肯定的。 Flux<Tweet>
表示推文流。但是,默认情况下,它会生成一个 JSON 数组,因为如果将单个 JSON 对象的流发送到浏览器,那么它作为一个整体将不是有效的 JSON 文档。除了使用 Server-Sent-Events 或 WebSocket 之外,浏览器客户端无法使用流。
但是,非浏览器客户端可以通过将 Accept
标头设置为 application/stream+json
来请求 JSON 流,并且响应将是类似于 Server-Sent-Events 的 JSON 流,但没有额外的格式:
{"id":"59ba5389d2b2a85ed4ebdafa","text":"tweet1","createdAt":1505383305602}
{"id":"59ba5587d2b2a85f93b8ece7","text":"tweet2","createdAt":1505383814847}
Spring 5 还提供了一个名为 WebClient
的异步和反应式 http 客户端,用于处理异步和流 API。它是 RestTemplate
的反应式替代品。
此外,您还可以获得用于编写集成测试的 WebTestClient
。测试客户端可以在实时服务器上运行,也可以与模拟请求和响应一起使用。
我们将使用 WebTestClient 为我们的 REST API 编写集成测试。打开 WebfluxDemoApplicationTests.java
文件并向其中添加以下测试 -
package com.example.webfluxdemo;
import com.example.webfluxdemo.model.Tweet;
import com.example.webfluxdemo.repository.TweetRepository;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.http.MediaType;
import org.springframework.test.web.reactive.server.WebTestClient;
import reactor.core.publisher.Mono;
import java.util.Collections;
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
public class WebfluxDemoApplicationTests {
@Autowired
private WebTestClient webTestClient;
@Autowired
TweetRepository tweetRepository;
@Test
public void testCreateTweet() {
Tweet tweet = new Tweet("This is a Test Tweet");
webTestClient.post().uri("/tweets")
.contentType(MediaType.APPLICATION_JSON)
.accept(MediaType.APPLICATION_JSON)
.body(Mono.just(tweet), Tweet.class)
.exchange()
.expectStatus().isOk()
.expectHeader().contentType(MediaType.APPLICATION_JSON)
.expectBody()
.jsonPath("$.id").isNotEmpty()
.jsonPath("$.text").isEqualTo("This is a Test Tweet");
}
@Test
public void testGetAllTweets() {
webTestClient.get().uri("/tweets")
.accept(MediaType.APPLICATION_JSON)
.exchange()
.expectStatus().isOk()
.expectHeader().contentType(MediaType.APPLICATION_JSON)
.expectBodyList(Tweet.class);
}
@Test
public void testGetSingleTweet() {
Tweet tweet = tweetRepository.save(new Tweet("Hello, World!")).block();
webTestClient.get()
.uri("/tweets/{id}", Collections.singletonMap("id", tweet.getId()))
.exchange()
.expectStatus().isOk()
.expectBody()
.consumeWith(response ->
Assertions.assertThat(response.getResponseBody()).isNotNull());
}
@Test
public void testUpdateTweet() {
Tweet tweet = tweetRepository.save(new Tweet("Initial Tweet")).block();
Tweet newTweetData = new Tweet("Updated Tweet");
webTestClient.put()
.uri("/tweets/{id}", Collections.singletonMap("id", tweet.getId()))
.contentType(MediaType.APPLICATION_JSON)
.accept(MediaType.APPLICATION_JSON)
.body(Mono.just(newTweetData), Tweet.class)
.exchange()
.expectStatus().isOk()
.expectHeader().contentType(MediaType.APPLICATION_JSON)
.expectBody()
.jsonPath("$.text").isEqualTo("Updated Tweet");
}
@Test
public void testDeleteTweet() {
Tweet tweet = tweetRepository.save(new Tweet("To be deleted")).block();
webTestClient.delete()
.uri("/tweets/{id}", Collections.singletonMap("id", tweet.getId()))
.exchange()
.expectStatus().isOk();
}
}
在上面的示例中,我为所有 CRUD API 编写了测试。您可以通过转到项目的根目录并键入 mvn test
来运行测试。
### 结论
在本文中,我们学习了 Spring 响应式编程的基础知识,并利用 Spring WebFlux 框架提供的响应式支持构建了一个简单的 Restful 服务。我们还使用 WebTestClient 测试了所有 Rest API。
您可以在 my github repository 中找到我们在本文中构建的应用程序的完整代码。请随意 fork 项目并对其进行处理。
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://www.callicoder.com/reactive-rest-apis-spring-webflux-reactive-mongo/
内容来源于网络,如有侵权,请联系作者删除!