使用 Spring WebFlux 和 Reactive MongoDB 构建 Reactive Rest API

x33g5p2x  于2021-10-20 转载在 Go  
字(9.4k)|赞(0)|评价(0)|浏览(660)

Spring 5 通过引入一个名为 Spring WebFlux 的全新响应式框架来拥抱响应式编程范式。

Spring WebFlux 是一个自底向上的异步框架。它可以使用 Servlet 3.1 非阻塞 IO API 以及其他异步运行时环境(例如 netty 或 undertow)在 Servlet 容器上运行。

它将与 Spring MVC 一起使用。是的,Spring MVC 不会去任何地方。它是一种流行的 Web 框架,开发人员已经使用了很长时间。

但是您现在可以在新的反应式框架和传统的 Spring MVC 之间做出选择。您可以根据您的用例选择使用其中的任何一个。

Spring WebFlux 使用一个名为 Reactor 的库来提供响应式支持。 Reactor 是 Reactive Streams 规范的一个实现。

Reactor 提供了两种主要类型,称为 FluxMono。这两种类型都实现了 Reactive Streams 提供的 Publisher 接口。 Flux 用于表示 0..N 个元素的流,Mono 用于表示 0..1 个元素的流。

尽管 Spring 使用 Reactor 作为其大部分内部 API 的核心依赖项,但它也支持在应用程序级别使用 RxJava。

Spring WebFlux 支持的编程模型

Spring WebFlux 支持两种类型的编程模型:

传统的基于注解的模型,带有 @Controller@RequestMapping 和您在 Spring MVC 中一直使用的其他注解。
1.
基于 Java 8 lambdas 的全新功能样式模型,用于路由和处理请求。

在本文中,我们将使用传统的基于注解的编程模型。我会在以后的文章中写到函数式风格模型。

让我们在 Spring Boot 中构建一个响应式 Restful 服务

在本文中,我们将为一个迷你推特应用程序构建一个 Restful API。该应用程序将只有一个名为 Tweet 的域模型。每个 Tweet 都会有一个 text 和一个 createdAt 字段。

我们将使用 MongoDB 作为我们的数据存储以及响应式 mongodb 驱动程序。我们将构建用于创建、检索、更新和删除推文的 REST API。所有 REST API 都将是异步的,并将返回一个发布者。

我们还将学习如何将数据从数据库传输到客户端。

最后,我们将使用 Spring 5 提供的新异步 WebTestClient 编写集成测试来测试所有 API。

创建项目

让我们使用 Spring Initializr Web 应用程序来生成我们的应用程序。按照以下步骤生成项目 -

  1. 前往http://start.spring.io
  2. 输入 Artifact 的值作为 webflux-demo
    1.设置包名为com.example.webfluxdemo
  3. 添加Reactive WebReactive MongoDBValidation 依赖
  4. 点击Generate生成并下载Project。

下载项目后,将其解压缩并将其导入您喜欢的 IDE。项目的目录结构应该是这样的——

配置 MongoDB

您可以通过简单地将以下属性添加到 application.properties 文件来配置 MongoDB -

spring.data.mongodb.uri=mongodb://localhost:27017/webflux_demo

Spring Boot 将在启动时读取此配置并自动配置数据源。

创建领域模型

让我们创建我们的域模型 - Tweet。在 com.example.webfluxdemo 包中创建一个名为 model 的新包,然后创建一个名为 Tweet.java 的文件,内容如下:

package com.example.webfluxdemo.model;

import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.mapping.Document;
import javax.validation.constraints.NotBlank;
import javax.validation.constraints.NotNull;
import javax.validation.constraints.Size;
import java.util.Date;

@Document(collection = "tweets")
public class Tweet {
    @Id
    private String id;

    @NotBlank
    @Size(max = 140)
    private String text;

    @NotNull
    private Date createdAt = new Date();

    public Tweet() {

    }

    public Tweet(String text) {
        this.id = id;
        this.text = text;
    }

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getText() {
        return text;
    }

    public void setText(String text) {
        this.text = text;
    }

    public Date getCreatedAt() {
        return createdAt;
    }

    public void setCreatedAt(Date createdAt) {
        this.createdAt = createdAt;
    }
}

够简单! Tweet 模型包含一个 text 和一个 createdAt 字段。 text 字段使用 @NotBlank@Size 批注,以确保它不为空且最多 140 个字符。

创建存储库

接下来,我们将创建用于访问 MongoDB 数据库的数据访问层。在 com.example.webfluxdemo 中创建一个名为 repository 的新包,然后创建一个名为 TweetRepository.java 的新文件,内容如下:

package com.example.webfluxdemo.repository;

import com.example.webfluxdemo.model.Tweet;
import org.springframework.data.mongodb.repository.ReactiveMongoRepository;
import org.springframework.stereotype.Repository;

@Repository
public interface TweetRepository extends ReactiveMongoRepository<Tweet, String> {

}

TweetRepository 接口从 ReactiveMongoRepository 扩展而来,它在 Document 上公开了各种 CRUD 方法。

Spring Boot 在运行时自动插入一个名为 SimpleReactiveMongoRepository 的接口实现。

因此,您无需编写任何代码即可轻松使用 Document 上的所有 CRUD 方法。以下是 SimpleReactiveMongoRepository 提供的一些方法 -

reactor.core.publisher.Flux<T>  findAll(); 

reactor.core.publisher.Mono<T>  findById(ID id); 

<S extends T> reactor.core.publisher.Mono<S>  save(S entity); 

reactor.core.publisher.Mono<Void>   delete(T entity);

请注意,所有方法都是异步的,并以 FluxMono 类型的形式返回发布者。

创建控制器端点

最后,让我们编写将暴露给客户端的 API。在 com.example.webfluxdemo 中创建一个名为 controller 的新包,然后创建一个名为 TweetController.java 的新文件,内容如下:

package com.example.webfluxdemo.controller;

import com.example.webfluxdemo.model.Tweet;
import com.example.webfluxdemo.repository.TweetRepository;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import javax.validation.Valid;

@RestController
public class TweetController {

    @Autowired
    private TweetRepository tweetRepository;

    @GetMapping("/tweets")
    public Flux<Tweet> getAllTweets() {
        return tweetRepository.findAll();
    }

    @PostMapping("/tweets")
    public Mono<Tweet> createTweets(@Valid @RequestBody Tweet tweet) {
        return tweetRepository.save(tweet);
    }

    @GetMapping("/tweets/{id}")
    public Mono<ResponseEntity<Tweet>> getTweetById(@PathVariable(value = "id") String tweetId) {
        return tweetRepository.findById(tweetId)
                .map(savedTweet -> ResponseEntity.ok(savedTweet))
                .defaultIfEmpty(ResponseEntity.notFound().build());
    }

    @PutMapping("/tweets/{id}")
    public Mono<ResponseEntity<Tweet>> updateTweet(@PathVariable(value = "id") String tweetId,
                                                   @Valid @RequestBody Tweet tweet) {
        return tweetRepository.findById(tweetId)
                .flatMap(existingTweet -> {
                    existingTweet.setText(tweet.getText());
                    return tweetRepository.save(existingTweet);
                })
                .map(updatedTweet -> new ResponseEntity<>(updatedTweet, HttpStatus.OK))
                .defaultIfEmpty(new ResponseEntity<>(HttpStatus.NOT_FOUND));
    }

    @DeleteMapping("/tweets/{id}")
    public Mono<ResponseEntity<Void>> deleteTweet(@PathVariable(value = "id") String tweetId) {

        return tweetRepository.findById(tweetId)
                .flatMap(existingTweet ->
                        tweetRepository.delete(existingTweet)
                            .then(Mono.just(new ResponseEntity<Void>(HttpStatus.OK)))
                )
                .defaultIfEmpty(new ResponseEntity<>(HttpStatus.NOT_FOUND));
    }

    // Tweets are Sent to the client as Server Sent Events
    @GetMapping(value = "/stream/tweets", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<Tweet> streamAllTweets() {
        return tweetRepository.findAll();
    }
}

所有控制器端点都以 Flux 或 Mono 的形式返回发布者。最后一个端点非常有趣,我们将内容类型设置为 text/event-stream。它以 Server Sent Events 的形式将推文发送到这样的浏览器 -

data: {"id":"59ba5389d2b2a85ed4ebdafa","text":"tweet1","createdAt":1505383305602}
data: {"id":"59ba5587d2b2a85f93b8ece7","text":"tweet2","createdAt":1505383814847}

既然我们在谈论事件流,你可能会问下面的端点是不是也返回一个流?

@GetMapping("/tweets")
public Flux<Tweet> getAllTweets() {
    return tweetRepository.findAll();
}

答案是肯定的。 Flux<Tweet> 表示推文流。但是,默认情况下,它会生成一个 JSON 数组,因为如果将单个 JSON 对象的流发送到浏览器,那么它作为一个整体将不是有效的 JSON 文档。除了使用 Server-Sent-Events 或 WebSocket 之外,浏览器客户端无法使用流。

但是,非浏览器客户端可以通过将 Accept 标头设置为 application/stream+json 来请求 JSON 流,并且响应将是类似于 Server-Sent-Events 的 JSON 流,但没有额外的格式:

{"id":"59ba5389d2b2a85ed4ebdafa","text":"tweet1","createdAt":1505383305602}
{"id":"59ba5587d2b2a85f93b8ece7","text":"tweet2","createdAt":1505383814847}

与 WebTestClient 的集成测试

Spring 5 还提供了一个名为 WebClient 的异步和反应式 http 客户端,用于处理异步和流 API。它是 RestTemplate 的反应式替代品。

此外,您还可以获得用于编写集成测试的 WebTestClient。测试客户端可以在实时服务器上运行,也可以与模拟请求和响应一起使用。

我们将使用 WebTestClient 为我们的 REST API 编写集成测试。打开 WebfluxDemoApplicationTests.java 文件并向其中添加以下测试 -

package com.example.webfluxdemo;

import com.example.webfluxdemo.model.Tweet;
import com.example.webfluxdemo.repository.TweetRepository;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.http.MediaType;
import org.springframework.test.web.reactive.server.WebTestClient;
import reactor.core.publisher.Mono;

import java.util.Collections;

@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
public class WebfluxDemoApplicationTests {

	@Autowired
	private WebTestClient webTestClient;

	@Autowired
    TweetRepository tweetRepository;

	@Test
	public void testCreateTweet() {
		Tweet tweet = new Tweet("This is a Test Tweet");

		webTestClient.post().uri("/tweets")
				.contentType(MediaType.APPLICATION_JSON)
                .accept(MediaType.APPLICATION_JSON)
                .body(Mono.just(tweet), Tweet.class)
				.exchange()
				.expectStatus().isOk()
				.expectHeader().contentType(MediaType.APPLICATION_JSON)
				.expectBody()
                .jsonPath("$.id").isNotEmpty()
                .jsonPath("$.text").isEqualTo("This is a Test Tweet");
	}

	@Test
    public void testGetAllTweets() {
	    webTestClient.get().uri("/tweets")
                .accept(MediaType.APPLICATION_JSON)
                .exchange()
                .expectStatus().isOk()
                .expectHeader().contentType(MediaType.APPLICATION_JSON)
                .expectBodyList(Tweet.class);
    }

    @Test
    public void testGetSingleTweet() {
        Tweet tweet = tweetRepository.save(new Tweet("Hello, World!")).block();

        webTestClient.get()
                .uri("/tweets/{id}", Collections.singletonMap("id", tweet.getId()))
                .exchange()
                .expectStatus().isOk()
                .expectBody()
                .consumeWith(response ->
                        Assertions.assertThat(response.getResponseBody()).isNotNull());
    }

    @Test
    public void testUpdateTweet() {
        Tweet tweet = tweetRepository.save(new Tweet("Initial Tweet")).block();

        Tweet newTweetData = new Tweet("Updated Tweet");

        webTestClient.put()
                .uri("/tweets/{id}", Collections.singletonMap("id", tweet.getId()))
                .contentType(MediaType.APPLICATION_JSON)
                .accept(MediaType.APPLICATION_JSON)
                .body(Mono.just(newTweetData), Tweet.class)
                .exchange()
                .expectStatus().isOk()
                .expectHeader().contentType(MediaType.APPLICATION_JSON)
                .expectBody()
                .jsonPath("$.text").isEqualTo("Updated Tweet");
    }

    @Test
    public void testDeleteTweet() {
	    Tweet tweet = tweetRepository.save(new Tweet("To be deleted")).block();

	    webTestClient.delete()
                .uri("/tweets/{id}", Collections.singletonMap("id",  tweet.getId()))
                .exchange()
                .expectStatus().isOk();
    }
}

在上面的示例中,我为所有 CRUD API 编写了测试。您可以通过转到项目的根目录并键入 mvn test 来运行测试。

### 结论

在本文中,我们学习了 Spring 响应式编程的基础知识,并利用 Spring WebFlux 框架提供的响应式支持构建了一个简单的 Restful 服务。我们还使用 WebTestClient 测试了所有 Rest API。

您可以在 my github repository 中找到我们在本文中构建的应用程序的完整代码。请随意 fork 项目并对其进行处理。

相关文章