SpringWebFlux如何在接收到存储库中的最后一个元素后保持子提交的活动性

j2datikz  于 2021-07-13  发布在  Java
关注(0)|答案(1)|浏览(250)

我想创建一个端点,将数据流到客户端(react app),直到客户端不说“好的,我完成了”。数据是从mongo获取的用户事件列表。获取终结点:

@GetMapping(produces = [MediaType.TEXT_EVENT_STREAM_VALUE])
fun streamEvents(@RequestParam userId: String): Flux<UserEvent> {
    return userEventRepository.findAllByUserId(userId)
}

当前实现在接收最后一个元素时关闭订阅。我不想让这种联系维持很长时间。我猜这将是30秒到1分钟。当连接是活动的客户端将创建新的事件使用后端点。

@PostMapping
fun createUserEvent(@RequestBody userEventRequest: UserEventRequest): Mono<UserEvent> {
    return userEventRepository.save(UserEvent(userEventRequest.userId))
}

这只是一个简单的例子。我不想在repo上调用save,而是想使它异步,这样会触发一个事件,处理程序将执行一些计算并将userevent保存到数据库。现在我想在客户端显示创建的事件。我可以用事件流实现吗?或者我需要用一些不同的东西,比如websockets?完美的解决方案是这样的

@GetMapping(produces = [MediaType.TEXT_EVENT_STREAM_VALUE])
fun streamEvents(@RequestParam userId: String): Flux<UserEvent> {
    return userEventRepository.findAllByUserId(userId).takeUntil { it.status != "FINISHED" }
}
g6ll5ycj

g6ll5ycj1#

我以前用过 ServerSentEvents 对于这种类型的逻辑,通过发送 keep alive 给定时间间隔内的消息。
下面是我以前的一个项目中的一个代码示例,我在其中合并了 keep alive 常规事件旁边的消息。

.GET("", accept(TEXT_EVENT_STREAM), request -> ok()
        .contentType(TEXT_EVENT_STREAM)
        .header("Cache-Control", "no-transform")
        .body(Flux.merge(myHandler.getEvents()), 
                  Flux.interval(Duration.ofSeconds(15))
                          .map(aLong -> ServerSentEvent.builder()
                                                       .comment("keep alive")
                                                       .build())),
        new ParameterizedTypeReference<List<MyEvents>>() {}))

相关问题